1use async_trait::async_trait;
7use ferrum_types::{
8 BatchId, InferenceRequest, InferenceResponse, Priority, RequestId, RequestState, Result,
9 SchedulerConfig as TypesSchedulerConfig, SchedulerStats,
10};
11use serde::{Deserialize, Serialize};
12use std::{collections::HashMap, time::Duration};
13
14#[async_trait]
16pub trait Scheduler: Send + Sync {
17 async fn submit(&self, request: InferenceRequest) -> Result<RequestId>;
19
20 async fn next_batch(&self, hint: BatchHint) -> Option<BatchPlan>;
22
23 async fn complete(&self, request_id: RequestId, response: &InferenceResponse) -> Result<()>;
25
26 async fn cancel(&self, request_id: RequestId) -> Result<bool>;
28
29 async fn update_priority(&self, request_id: RequestId, priority: Priority) -> Result<()>;
31
32 fn metrics(&self) -> SchedulerMetrics;
34
35 fn config(&self) -> &TypesSchedulerConfig;
37
38 fn request_state(&self, request_id: &RequestId) -> Option<RequestState> {
40 let _ = request_id;
41 None
42 }
43
44 async fn preempt(&self, _request_id: RequestId) -> Result<PreemptionResult> {
46 Err(ferrum_types::FerrumError::unsupported(
48 "Preemption not supported",
49 ))
50 }
51
52 async fn resume(&self, _request_id: RequestId) -> Result<()> {
54 Err(ferrum_types::FerrumError::unsupported(
56 "Resumption not supported",
57 ))
58 }
59}
60
61#[derive(Debug, Clone)]
63pub struct BatchHint {
64 pub max_batch_size: usize,
66 pub max_tokens: usize,
68 pub target_latency_ms: Option<u64>,
70 pub available_memory: Option<u64>,
72 pub resource_constraints: ResourceConstraints,
74}
75
76impl BatchHint {
77 pub fn simple(max_batch_size: usize) -> Self {
79 Self {
80 max_batch_size,
81 max_tokens: max_batch_size * 2048, target_latency_ms: None,
83 available_memory: None,
84 resource_constraints: ResourceConstraints::default(),
85 }
86 }
87}
88
89#[derive(Debug, Clone, Serialize, Deserialize, Default)]
91pub struct ResourceConstraints {
92 pub max_gpu_memory: Option<u64>,
94 pub max_cpu_memory: Option<u64>,
96 pub max_compute_units: Option<usize>,
98 pub required_devices: Vec<ferrum_types::Device>,
100}
101
102#[derive(Debug, Clone)]
104pub struct BatchPlan {
105 pub batch_id: BatchId,
107 pub requests: Vec<ScheduledRequest>,
109 pub max_sequence_length: usize,
111 pub estimated_time_ms: Option<u64>,
113 pub resource_requirements: BatchResourceRequirements,
115 pub created_at: chrono::DateTime<chrono::Utc>,
117}
118
119impl BatchPlan {
120 pub fn total_tokens(&self) -> usize {
122 self.requests
123 .iter()
124 .map(|req| req.request.sampling_params.max_tokens)
125 .sum()
126 }
127
128 pub fn size(&self) -> usize {
130 self.requests.len()
131 }
132
133 pub fn is_empty(&self) -> bool {
135 self.requests.is_empty()
136 }
137
138 pub fn max_priority(&self) -> Priority {
140 self.requests
141 .iter()
142 .map(|req| req.request.priority)
143 .max()
144 .unwrap_or(Priority::Low)
145 }
146}
147
148#[derive(Debug, Clone)]
150pub struct ScheduledRequest {
151 pub request: InferenceRequest,
153 pub state: RequestState,
155 pub queue_position: Option<usize>,
157 pub estimated_wait_time: Option<Duration>,
159 pub tokens_processed: usize,
161 pub allocated_resources: AllocatedResources,
163 pub submitted_at: chrono::DateTime<chrono::Utc>,
165 pub started_at: Option<chrono::DateTime<chrono::Utc>>,
167}
168
169impl ScheduledRequest {
170 pub fn new(request: InferenceRequest) -> Self {
172 Self {
173 request,
174 state: RequestState::Waiting,
175 queue_position: None,
176 estimated_wait_time: None,
177 tokens_processed: 0,
178 allocated_resources: AllocatedResources::default(),
179 submitted_at: chrono::Utc::now(),
180 started_at: None,
181 }
182 }
183
184 pub fn age(&self) -> Duration {
186 (chrono::Utc::now() - self.submitted_at)
187 .to_std()
188 .unwrap_or_default()
189 }
190
191 pub fn processing_time(&self) -> Option<Duration> {
193 self.started_at
194 .map(|start| (chrono::Utc::now() - start).to_std().unwrap_or_default())
195 }
196}
197
198#[derive(Debug, Clone, Default)]
200pub struct AllocatedResources {
201 pub kv_cache_blocks: Vec<ferrum_types::BlockId>,
203 pub gpu_memory: u64,
205 pub cpu_memory: u64,
207 pub compute_units: usize,
209}
210
211#[derive(Debug, Clone)]
213pub struct BatchResourceRequirements {
214 pub gpu_memory: u64,
216 pub cpu_memory: u64,
218 pub kv_cache_blocks: usize,
220 pub compute_units: usize,
222}
223
224#[derive(Debug, Clone)]
226pub struct PreemptionResult {
227 pub success: bool,
229 pub saved_state: Option<PreemptionState>,
231 pub freed_resources: AllocatedResources,
233}
234
235#[derive(Debug, Clone)]
237pub struct PreemptionState {
238 pub kv_cache_checkpoint: Vec<u8>,
240 pub tokens_processed: usize,
242 pub generation_state: HashMap<String, serde_json::Value>,
244}
245
246pub type SchedulerConfig = TypesSchedulerConfig;
248
249#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
251pub enum SchedulingPolicy {
252 FCFS,
254 Priority,
256 FairShare,
258 SJF,
260 ResourceAware,
262 SlaAware,
264}
265
266#[derive(Debug, Clone, Serialize, Deserialize)]
268pub struct FairShareConfig {
269 pub client_shares: HashMap<String, f32>,
271 pub default_share: f32,
273 pub enforcement_strictness: f32,
275}
276
277#[derive(Debug, Clone, Serialize, Deserialize)]
279pub struct SlaConfig {
280 pub enabled: bool,
282 pub default_sla: SlaRequirements,
284 pub client_slas: HashMap<String, SlaRequirements>,
286}
287
288#[derive(Debug, Clone, Serialize, Deserialize)]
290pub struct SlaRequirements {
291 pub max_latency_p95_ms: u64,
293 pub max_latency_p99_ms: u64,
295 pub min_throughput_rps: f32,
297 pub availability_percent: f32,
299}
300
301#[derive(Debug, Clone, Serialize, Deserialize, Default)]
303pub struct ResourceLimits {
304 pub max_gpu_memory: Option<u64>,
306 pub max_cpu_memory: Option<u64>,
308 pub max_kv_cache_blocks: Option<usize>,
310 pub per_client_limits: HashMap<String, ClientResourceLimits>,
312}
313
314#[derive(Debug, Clone, Serialize, Deserialize)]
316pub struct ClientResourceLimits {
317 pub max_concurrent_requests: usize,
319 pub max_gpu_memory: Option<u64>,
321 pub max_requests_per_minute: Option<u32>,
323}
324
325pub type SchedulerMetrics = SchedulerStats;
326
327#[async_trait]
329pub trait AdvancedScheduler: Scheduler {
330 async fn enable_resource_awareness(&mut self, config: ResourceAwarenessConfig) -> Result<()>;
332
333 async fn set_admission_policy(&mut self, policy: Box<dyn AdmissionPolicy>) -> Result<()>;
335
336 async fn configure_dynamic_batching(&mut self, config: DynamicBatchingConfig) -> Result<()>;
338
339 fn queue_analysis(&self) -> QueueAnalysis;
341
342 async fn simulate_load(
344 &self,
345 workload: &SimulatedWorkload,
346 ) -> Result<SchedulingSimulationResult>;
347}
348
349#[derive(Debug, Clone, Serialize, Deserialize)]
351pub struct ResourceAwarenessConfig {
352 pub enable_memory_awareness: bool,
354 pub enable_compute_awareness: bool,
356 pub prediction_horizon_ms: u64,
358 pub safety_margin: f32,
360}
361
362pub trait AdmissionPolicy: Send + Sync {
364 fn should_admit(
366 &self,
367 request: &InferenceRequest,
368 current_metrics: &SchedulerMetrics,
369 ) -> AdmissionDecision;
370
371 fn name(&self) -> &str;
373}
374
375#[derive(Debug, Clone)]
377pub enum AdmissionDecision {
378 Accept,
380 Reject(String),
382 AcceptWithDelay(Duration),
384}
385
386#[derive(Debug, Clone, Serialize, Deserialize)]
388pub struct DynamicBatchingConfig {
389 pub min_batch_size: usize,
391 pub max_batch_size: usize,
393 pub batch_timeout_ms: u64,
395 pub enable_adaptive_sizing: bool,
397 pub target_utilization: f32,
399}
400
401#[derive(Debug, Clone)]
403pub struct QueueAnalysis {
404 pub queue_depth_history: Vec<(chrono::DateTime<chrono::Utc>, usize)>,
406 pub wait_time_distribution: WaitTimeDistribution,
408 pub request_patterns: RequestPatternAnalysis,
410 pub bottlenecks: Vec<BottleneckAnalysis>,
412}
413
414#[derive(Debug, Clone)]
416pub struct WaitTimeDistribution {
417 pub p50_ms: f64,
419 pub p95_ms: f64,
421 pub p99_ms: f64,
423 pub max_ms: f64,
425 pub mean_ms: f64,
427}
428
429#[derive(Debug, Clone)]
431pub struct RequestPatternAnalysis {
432 pub peak_times: Vec<chrono::DateTime<chrono::Utc>>,
434 pub rate_trend: RateTrend,
436 pub seasonality: SeasonalityPattern,
438}
439
440#[derive(Debug, Clone, Copy)]
442pub enum RateTrend {
443 Increasing,
444 Decreasing,
445 Stable,
446 Volatile,
447}
448
449#[derive(Debug, Clone)]
451pub struct SeasonalityPattern {
452 pub hourly_pattern: Vec<f32>,
454 pub daily_pattern: Vec<f32>,
456 pub weekly_pattern: Vec<f32>,
458}
459
460#[derive(Debug, Clone)]
462pub struct BottleneckAnalysis {
463 pub bottleneck_type: BottleneckType,
465 pub severity: f32,
467 pub description: String,
469 pub mitigation: String,
471}
472
473#[derive(Debug, Clone, Copy)]
475pub enum BottleneckType {
476 Memory,
478 Compute,
480 IO,
482 Scheduling,
484 Network,
486}
487
488#[derive(Debug, Clone)]
490pub struct SimulatedWorkload {
491 pub arrival_pattern: ArrivalPattern,
493 pub size_distribution: SizeDistribution,
495 pub duration_seconds: u64,
497}
498
499#[derive(Debug, Clone)]
501pub enum ArrivalPattern {
502 Constant { rate_rps: f32 },
504 Poisson { lambda: f32 },
506 Bursty {
508 burst_rate: f32,
509 quiet_rate: f32,
510 burst_duration_s: f32,
511 },
512 Seasonal {
514 base_rate: f32,
515 peaks: Vec<(f32, f32)>,
516 }, }
518
519#[derive(Debug, Clone)]
521pub enum SizeDistribution {
522 Fixed { tokens: usize },
524 Uniform {
526 min_tokens: usize,
527 max_tokens: usize,
528 },
529 Normal { mean: f32, std_dev: f32 },
531 LogNormal { mu: f32, sigma: f32 },
533}
534
535#[derive(Debug, Clone)]
537pub struct SchedulingSimulationResult {
538 pub total_requests: u64,
540 pub successful_requests: u64,
542 pub failed_requests: u64,
544 pub avg_latency_ms: f64,
546 pub p95_latency_ms: f64,
548 pub p99_latency_ms: f64,
550 pub throughput_rps: f32,
552 pub resource_utilization: Option<ResourceStats>,
554 pub bottlenecks: Vec<BottleneckAnalysis>,
556}
557
558#[derive(Debug, Clone, Serialize, Deserialize, Default)]
559pub struct ResourceStats {
560 pub gpu_memory_bytes: Option<u64>,
561 pub cpu_memory_bytes: Option<u64>,
562 pub compute_utilization: Option<f32>,
563}