1use async_trait::async_trait;
7use ferrum_types::{
8 BatchId, InferenceRequest, InferenceResponse, Priority, RequestId, RequestState, Result,
9 SchedulerConfig as TypesSchedulerConfig, SchedulerStats,
10};
11use serde::{Deserialize, Serialize};
12use std::{collections::HashMap, time::Duration};
13
14#[async_trait]
16pub trait Scheduler: Send + Sync {
17 async fn submit(&self, request: InferenceRequest) -> Result<RequestId>;
19
20 async fn next_batch(&self, hint: BatchHint) -> Option<BatchPlan>;
22
23 async fn complete(&self, request_id: RequestId, response: &InferenceResponse) -> Result<()>;
25
26 async fn cancel(&self, request_id: RequestId) -> Result<bool>;
28
29 async fn update_priority(&self, request_id: RequestId, priority: Priority) -> Result<()>;
31
32 fn metrics(&self) -> SchedulerMetrics;
34
35 fn config(&self) -> &TypesSchedulerConfig;
37
38 fn request_state(&self, request_id: &RequestId) -> Option<RequestState> {
40 let _ = request_id;
41 None
42 }
43
44 async fn preempt(&self, _request_id: RequestId) -> Result<PreemptionResult> {
46 Err(ferrum_types::FerrumError::unsupported(
48 "Preemption not supported",
49 ))
50 }
51
52 async fn resume(&self, _request_id: RequestId) -> Result<()> {
54 Err(ferrum_types::FerrumError::unsupported(
56 "Resumption not supported",
57 ))
58 }
59}
60
61#[derive(Debug, Clone)]
63pub struct BatchHint {
64 pub max_batch_size: usize,
66 pub max_tokens: usize,
68 pub target_latency_ms: Option<u64>,
70 pub available_memory: Option<u64>,
72 pub resource_constraints: ResourceConstraints,
74}
75
76impl BatchHint {
77 pub fn simple(max_batch_size: usize) -> Self {
79 Self {
80 max_batch_size,
81 max_tokens: max_batch_size * 2048, target_latency_ms: None,
83 available_memory: None,
84 resource_constraints: ResourceConstraints::default(),
85 }
86 }
87}
88
89#[derive(Debug, Clone, Serialize, Deserialize)]
91pub struct ResourceConstraints {
92 pub max_gpu_memory: Option<u64>,
94 pub max_cpu_memory: Option<u64>,
96 pub max_compute_units: Option<usize>,
98 pub required_devices: Vec<ferrum_types::Device>,
100}
101
102impl Default for ResourceConstraints {
103 fn default() -> Self {
104 Self {
105 max_gpu_memory: None,
106 max_cpu_memory: None,
107 max_compute_units: None,
108 required_devices: vec![],
109 }
110 }
111}
112
113#[derive(Debug, Clone)]
115pub struct BatchPlan {
116 pub batch_id: BatchId,
118 pub requests: Vec<ScheduledRequest>,
120 pub max_sequence_length: usize,
122 pub estimated_time_ms: Option<u64>,
124 pub resource_requirements: BatchResourceRequirements,
126 pub created_at: chrono::DateTime<chrono::Utc>,
128}
129
130impl BatchPlan {
131 pub fn total_tokens(&self) -> usize {
133 self.requests
134 .iter()
135 .map(|req| req.request.sampling_params.max_tokens)
136 .sum()
137 }
138
139 pub fn size(&self) -> usize {
141 self.requests.len()
142 }
143
144 pub fn is_empty(&self) -> bool {
146 self.requests.is_empty()
147 }
148
149 pub fn max_priority(&self) -> Priority {
151 self.requests
152 .iter()
153 .map(|req| req.request.priority)
154 .max()
155 .unwrap_or(Priority::Low)
156 }
157}
158
159#[derive(Debug, Clone)]
161pub struct ScheduledRequest {
162 pub request: InferenceRequest,
164 pub state: RequestState,
166 pub queue_position: Option<usize>,
168 pub estimated_wait_time: Option<Duration>,
170 pub tokens_processed: usize,
172 pub allocated_resources: AllocatedResources,
174 pub submitted_at: chrono::DateTime<chrono::Utc>,
176 pub started_at: Option<chrono::DateTime<chrono::Utc>>,
178}
179
180impl ScheduledRequest {
181 pub fn new(request: InferenceRequest) -> Self {
183 Self {
184 request,
185 state: RequestState::Waiting,
186 queue_position: None,
187 estimated_wait_time: None,
188 tokens_processed: 0,
189 allocated_resources: AllocatedResources::default(),
190 submitted_at: chrono::Utc::now(),
191 started_at: None,
192 }
193 }
194
195 pub fn age(&self) -> Duration {
197 (chrono::Utc::now() - self.submitted_at)
198 .to_std()
199 .unwrap_or_default()
200 }
201
202 pub fn processing_time(&self) -> Option<Duration> {
204 self.started_at
205 .map(|start| (chrono::Utc::now() - start).to_std().unwrap_or_default())
206 }
207}
208
209#[derive(Debug, Clone, Default)]
211pub struct AllocatedResources {
212 pub kv_cache_blocks: Vec<ferrum_types::BlockId>,
214 pub gpu_memory: u64,
216 pub cpu_memory: u64,
218 pub compute_units: usize,
220}
221
222#[derive(Debug, Clone)]
224pub struct BatchResourceRequirements {
225 pub gpu_memory: u64,
227 pub cpu_memory: u64,
229 pub kv_cache_blocks: usize,
231 pub compute_units: usize,
233}
234
235#[derive(Debug, Clone)]
237pub struct PreemptionResult {
238 pub success: bool,
240 pub saved_state: Option<PreemptionState>,
242 pub freed_resources: AllocatedResources,
244}
245
246#[derive(Debug, Clone)]
248pub struct PreemptionState {
249 pub kv_cache_checkpoint: Vec<u8>,
251 pub tokens_processed: usize,
253 pub generation_state: HashMap<String, serde_json::Value>,
255}
256
257pub type SchedulerConfig = TypesSchedulerConfig;
259
260#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
262pub enum SchedulingPolicy {
263 FCFS,
265 Priority,
267 FairShare,
269 SJF,
271 ResourceAware,
273 SlaAware,
275}
276
277#[derive(Debug, Clone, Serialize, Deserialize)]
279pub struct FairShareConfig {
280 pub client_shares: HashMap<String, f32>,
282 pub default_share: f32,
284 pub enforcement_strictness: f32,
286}
287
288#[derive(Debug, Clone, Serialize, Deserialize)]
290pub struct SlaConfig {
291 pub enabled: bool,
293 pub default_sla: SlaRequirements,
295 pub client_slas: HashMap<String, SlaRequirements>,
297}
298
299#[derive(Debug, Clone, Serialize, Deserialize)]
301pub struct SlaRequirements {
302 pub max_latency_p95_ms: u64,
304 pub max_latency_p99_ms: u64,
306 pub min_throughput_rps: f32,
308 pub availability_percent: f32,
310}
311
312#[derive(Debug, Clone, Serialize, Deserialize)]
314pub struct ResourceLimits {
315 pub max_gpu_memory: Option<u64>,
317 pub max_cpu_memory: Option<u64>,
319 pub max_kv_cache_blocks: Option<usize>,
321 pub per_client_limits: HashMap<String, ClientResourceLimits>,
323}
324
325impl Default for ResourceLimits {
326 fn default() -> Self {
327 Self {
328 max_gpu_memory: None,
329 max_cpu_memory: None,
330 max_kv_cache_blocks: None,
331 per_client_limits: HashMap::new(),
332 }
333 }
334}
335
336#[derive(Debug, Clone, Serialize, Deserialize)]
338pub struct ClientResourceLimits {
339 pub max_concurrent_requests: usize,
341 pub max_gpu_memory: Option<u64>,
343 pub max_requests_per_minute: Option<u32>,
345}
346
347pub type SchedulerMetrics = SchedulerStats;
348
349#[async_trait]
351pub trait AdvancedScheduler: Scheduler {
352 async fn enable_resource_awareness(&mut self, config: ResourceAwarenessConfig) -> Result<()>;
354
355 async fn set_admission_policy(&mut self, policy: Box<dyn AdmissionPolicy>) -> Result<()>;
357
358 async fn configure_dynamic_batching(&mut self, config: DynamicBatchingConfig) -> Result<()>;
360
361 fn queue_analysis(&self) -> QueueAnalysis;
363
364 async fn simulate_load(
366 &self,
367 workload: &SimulatedWorkload,
368 ) -> Result<SchedulingSimulationResult>;
369}
370
371#[derive(Debug, Clone, Serialize, Deserialize)]
373pub struct ResourceAwarenessConfig {
374 pub enable_memory_awareness: bool,
376 pub enable_compute_awareness: bool,
378 pub prediction_horizon_ms: u64,
380 pub safety_margin: f32,
382}
383
384pub trait AdmissionPolicy: Send + Sync {
386 fn should_admit(
388 &self,
389 request: &InferenceRequest,
390 current_metrics: &SchedulerMetrics,
391 ) -> AdmissionDecision;
392
393 fn name(&self) -> &str;
395}
396
397#[derive(Debug, Clone)]
399pub enum AdmissionDecision {
400 Accept,
402 Reject(String),
404 AcceptWithDelay(Duration),
406}
407
408#[derive(Debug, Clone, Serialize, Deserialize)]
410pub struct DynamicBatchingConfig {
411 pub min_batch_size: usize,
413 pub max_batch_size: usize,
415 pub batch_timeout_ms: u64,
417 pub enable_adaptive_sizing: bool,
419 pub target_utilization: f32,
421}
422
423#[derive(Debug, Clone)]
425pub struct QueueAnalysis {
426 pub queue_depth_history: Vec<(chrono::DateTime<chrono::Utc>, usize)>,
428 pub wait_time_distribution: WaitTimeDistribution,
430 pub request_patterns: RequestPatternAnalysis,
432 pub bottlenecks: Vec<BottleneckAnalysis>,
434}
435
436#[derive(Debug, Clone)]
438pub struct WaitTimeDistribution {
439 pub p50_ms: f64,
441 pub p95_ms: f64,
443 pub p99_ms: f64,
445 pub max_ms: f64,
447 pub mean_ms: f64,
449}
450
451#[derive(Debug, Clone)]
453pub struct RequestPatternAnalysis {
454 pub peak_times: Vec<chrono::DateTime<chrono::Utc>>,
456 pub rate_trend: RateTrend,
458 pub seasonality: SeasonalityPattern,
460}
461
462#[derive(Debug, Clone, Copy)]
464pub enum RateTrend {
465 Increasing,
466 Decreasing,
467 Stable,
468 Volatile,
469}
470
471#[derive(Debug, Clone)]
473pub struct SeasonalityPattern {
474 pub hourly_pattern: Vec<f32>,
476 pub daily_pattern: Vec<f32>,
478 pub weekly_pattern: Vec<f32>,
480}
481
482#[derive(Debug, Clone)]
484pub struct BottleneckAnalysis {
485 pub bottleneck_type: BottleneckType,
487 pub severity: f32,
489 pub description: String,
491 pub mitigation: String,
493}
494
495#[derive(Debug, Clone, Copy)]
497pub enum BottleneckType {
498 Memory,
500 Compute,
502 IO,
504 Scheduling,
506 Network,
508}
509
510#[derive(Debug, Clone)]
512pub struct SimulatedWorkload {
513 pub arrival_pattern: ArrivalPattern,
515 pub size_distribution: SizeDistribution,
517 pub duration_seconds: u64,
519}
520
521#[derive(Debug, Clone)]
523pub enum ArrivalPattern {
524 Constant { rate_rps: f32 },
526 Poisson { lambda: f32 },
528 Bursty {
530 burst_rate: f32,
531 quiet_rate: f32,
532 burst_duration_s: f32,
533 },
534 Seasonal {
536 base_rate: f32,
537 peaks: Vec<(f32, f32)>,
538 }, }
540
541#[derive(Debug, Clone)]
543pub enum SizeDistribution {
544 Fixed { tokens: usize },
546 Uniform {
548 min_tokens: usize,
549 max_tokens: usize,
550 },
551 Normal { mean: f32, std_dev: f32 },
553 LogNormal { mu: f32, sigma: f32 },
555}
556
557#[derive(Debug, Clone)]
559pub struct SchedulingSimulationResult {
560 pub total_requests: u64,
562 pub successful_requests: u64,
564 pub failed_requests: u64,
566 pub avg_latency_ms: f64,
568 pub p95_latency_ms: f64,
570 pub p99_latency_ms: f64,
572 pub throughput_rps: f32,
574 pub resource_utilization: Option<ResourceStats>,
576 pub bottlenecks: Vec<BottleneckAnalysis>,
578}
579
580#[derive(Debug, Clone, Serialize, Deserialize, Default)]
581pub struct ResourceStats {
582 pub gpu_memory_bytes: Option<u64>,
583 pub cpu_memory_bytes: Option<u64>,
584 pub compute_utilization: Option<f32>,
585}