Skip to main content

ferrum_interfaces/
scheduler.rs

1//! Unified scheduler interface with resource awareness and SLA support
2//!
3//! This module provides the unified scheduler interface that replaces the
4//! conflicting scheduler definitions in the original codebase.
5
6use async_trait::async_trait;
7use ferrum_types::{
8    BatchId, InferenceRequest, InferenceResponse, Priority, RequestId, RequestState, Result,
9    SchedulerConfig as TypesSchedulerConfig, SchedulerStats,
10};
11use serde::{Deserialize, Serialize};
12use std::{collections::HashMap, time::Duration};
13
14/// Main scheduler trait for request management and batching
15#[async_trait]
16pub trait Scheduler: Send + Sync {
17    /// Submit new inference request
18    async fn submit(&self, request: InferenceRequest) -> Result<RequestId>;
19
20    /// Get next batch of requests to execute
21    async fn next_batch(&self, hint: BatchHint) -> Option<BatchPlan>;
22
23    /// Mark request as completed
24    async fn complete(&self, request_id: RequestId, response: &InferenceResponse) -> Result<()>;
25
26    /// Cancel pending request
27    async fn cancel(&self, request_id: RequestId) -> Result<bool>;
28
29    /// Update request priority
30    async fn update_priority(&self, request_id: RequestId, priority: Priority) -> Result<()>;
31
32    /// Get scheduler metrics
33    fn metrics(&self) -> SchedulerMetrics;
34
35    /// Get scheduler configuration
36    fn config(&self) -> &TypesSchedulerConfig;
37
38    /// Get current request state if the request is tracked by scheduler.
39    fn request_state(&self, request_id: &RequestId) -> Option<RequestState> {
40        let _ = request_id;
41        None
42    }
43
44    /// Preempt running request (if supported)
45    async fn preempt(&self, _request_id: RequestId) -> Result<PreemptionResult> {
46        // Default implementation: preemption not supported
47        Err(ferrum_types::FerrumError::unsupported(
48            "Preemption not supported",
49        ))
50    }
51
52    /// Resume preempted request
53    async fn resume(&self, _request_id: RequestId) -> Result<()> {
54        // Default implementation: resumption not supported
55        Err(ferrum_types::FerrumError::unsupported(
56            "Resumption not supported",
57        ))
58    }
59}
60
61/// Batch hint for scheduler optimization
62#[derive(Debug, Clone)]
63pub struct BatchHint {
64    /// Maximum batch size
65    pub max_batch_size: usize,
66    /// Maximum total tokens in batch
67    pub max_tokens: usize,
68    /// Target latency for batch formation
69    pub target_latency_ms: Option<u64>,
70    /// Available memory for batch
71    pub available_memory: Option<u64>,
72    /// Resource constraints
73    pub resource_constraints: ResourceConstraints,
74}
75
76impl BatchHint {
77    /// Create simple batch hint with size limit
78    pub fn simple(max_batch_size: usize) -> Self {
79        Self {
80            max_batch_size,
81            max_tokens: max_batch_size * 2048, // Default reasonable token limit
82            target_latency_ms: None,
83            available_memory: None,
84            resource_constraints: ResourceConstraints::default(),
85        }
86    }
87}
88
89/// Resource constraints for scheduling
90#[derive(Debug, Clone, Serialize, Deserialize, Default)]
91pub struct ResourceConstraints {
92    /// Maximum GPU memory usage
93    pub max_gpu_memory: Option<u64>,
94    /// Maximum CPU memory usage
95    pub max_cpu_memory: Option<u64>,
96    /// Maximum compute units
97    pub max_compute_units: Option<usize>,
98    /// Required device types
99    pub required_devices: Vec<ferrum_types::Device>,
100}
101
102/// Batch execution plan
103#[derive(Debug, Clone)]
104pub struct BatchPlan {
105    /// Unique batch identifier
106    pub batch_id: BatchId,
107    /// Requests included in this batch
108    pub requests: Vec<ScheduledRequest>,
109    /// Maximum sequence length in batch
110    pub max_sequence_length: usize,
111    /// Estimated execution time
112    pub estimated_time_ms: Option<u64>,
113    /// Resource requirements
114    pub resource_requirements: BatchResourceRequirements,
115    /// Batch creation timestamp
116    pub created_at: chrono::DateTime<chrono::Utc>,
117}
118
119impl BatchPlan {
120    /// Get total number of tokens in batch
121    pub fn total_tokens(&self) -> usize {
122        self.requests
123            .iter()
124            .map(|req| req.request.sampling_params.max_tokens)
125            .sum()
126    }
127
128    /// Get batch size
129    pub fn size(&self) -> usize {
130        self.requests.len()
131    }
132
133    /// Check if batch is empty
134    pub fn is_empty(&self) -> bool {
135        self.requests.is_empty()
136    }
137
138    /// Get highest priority in batch
139    pub fn max_priority(&self) -> Priority {
140        self.requests
141            .iter()
142            .map(|req| req.request.priority)
143            .max()
144            .unwrap_or(Priority::Low)
145    }
146}
147
148/// Scheduled request with additional metadata
149#[derive(Debug, Clone)]
150pub struct ScheduledRequest {
151    /// Original inference request
152    pub request: InferenceRequest,
153    /// Current scheduling state
154    pub state: RequestState,
155    /// Queue position when waiting
156    pub queue_position: Option<usize>,
157    /// Estimated wait time
158    pub estimated_wait_time: Option<Duration>,
159    /// Number of tokens processed so far
160    pub tokens_processed: usize,
161    /// Allocated resources
162    pub allocated_resources: AllocatedResources,
163    /// Request submission time
164    pub submitted_at: chrono::DateTime<chrono::Utc>,
165    /// Request start time (when moved from waiting to running)
166    pub started_at: Option<chrono::DateTime<chrono::Utc>>,
167}
168
169impl ScheduledRequest {
170    /// Create new scheduled request
171    pub fn new(request: InferenceRequest) -> Self {
172        Self {
173            request,
174            state: RequestState::Waiting,
175            queue_position: None,
176            estimated_wait_time: None,
177            tokens_processed: 0,
178            allocated_resources: AllocatedResources::default(),
179            submitted_at: chrono::Utc::now(),
180            started_at: None,
181        }
182    }
183
184    /// Get request age since submission
185    pub fn age(&self) -> Duration {
186        (chrono::Utc::now() - self.submitted_at)
187            .to_std()
188            .unwrap_or_default()
189    }
190
191    /// Get processing time (if started)
192    pub fn processing_time(&self) -> Option<Duration> {
193        self.started_at
194            .map(|start| (chrono::Utc::now() - start).to_std().unwrap_or_default())
195    }
196}
197
198/// Allocated resources for a request
199#[derive(Debug, Clone, Default)]
200pub struct AllocatedResources {
201    /// KV cache blocks allocated
202    pub kv_cache_blocks: Vec<ferrum_types::BlockId>,
203    /// GPU memory allocated (bytes)
204    pub gpu_memory: u64,
205    /// CPU memory allocated (bytes)
206    pub cpu_memory: u64,
207    /// Compute units reserved
208    pub compute_units: usize,
209}
210
211/// Resource requirements for batch execution
212#[derive(Debug, Clone)]
213pub struct BatchResourceRequirements {
214    /// Required GPU memory
215    pub gpu_memory: u64,
216    /// Required CPU memory
217    pub cpu_memory: u64,
218    /// Required KV cache blocks
219    pub kv_cache_blocks: usize,
220    /// Required compute units
221    pub compute_units: usize,
222}
223
224/// Preemption result
225#[derive(Debug, Clone)]
226pub struct PreemptionResult {
227    /// Whether preemption was successful
228    pub success: bool,
229    /// Saved state for resumption (if any)
230    pub saved_state: Option<PreemptionState>,
231    /// Resources freed by preemption
232    pub freed_resources: AllocatedResources,
233}
234
235/// State saved during preemption
236#[derive(Debug, Clone)]
237pub struct PreemptionState {
238    /// KV cache checkpoint
239    pub kv_cache_checkpoint: Vec<u8>,
240    /// Number of tokens processed
241    pub tokens_processed: usize,
242    /// Generation state
243    pub generation_state: HashMap<String, serde_json::Value>,
244}
245
246/// Scheduler configuration
247pub type SchedulerConfig = TypesSchedulerConfig;
248
249/// Scheduling policies
250#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
251pub enum SchedulingPolicy {
252    /// First-Come-First-Served
253    FCFS,
254    /// Priority-based scheduling
255    Priority,
256    /// Fair-share scheduling
257    FairShare,
258    /// Shortest-Job-First
259    SJF,
260    /// Resource-aware scheduling
261    ResourceAware,
262    /// SLA-driven scheduling
263    SlaAware,
264}
265
266/// Fair share configuration
267#[derive(Debug, Clone, Serialize, Deserialize)]
268pub struct FairShareConfig {
269    /// Share weights per client
270    pub client_shares: HashMap<String, f32>,
271    /// Default share for unspecified clients
272    pub default_share: f32,
273    /// Share enforcement strictness (0.0 - 1.0)
274    pub enforcement_strictness: f32,
275}
276
277/// SLA configuration
278#[derive(Debug, Clone, Serialize, Deserialize)]
279pub struct SlaConfig {
280    /// Enable SLA enforcement
281    pub enabled: bool,
282    /// Default SLA for requests without specific SLA
283    pub default_sla: SlaRequirements,
284    /// Per-client SLA overrides
285    pub client_slas: HashMap<String, SlaRequirements>,
286}
287
288/// SLA requirements
289#[derive(Debug, Clone, Serialize, Deserialize)]
290pub struct SlaRequirements {
291    /// Maximum latency (P95)
292    pub max_latency_p95_ms: u64,
293    /// Maximum latency (P99)
294    pub max_latency_p99_ms: u64,
295    /// Minimum throughput
296    pub min_throughput_rps: f32,
297    /// Availability requirement
298    pub availability_percent: f32,
299}
300
301/// Resource limits
302#[derive(Debug, Clone, Serialize, Deserialize, Default)]
303pub struct ResourceLimits {
304    /// Maximum total GPU memory
305    pub max_gpu_memory: Option<u64>,
306    /// Maximum total CPU memory
307    pub max_cpu_memory: Option<u64>,
308    /// Maximum KV cache blocks
309    pub max_kv_cache_blocks: Option<usize>,
310    /// Per-client resource limits
311    pub per_client_limits: HashMap<String, ClientResourceLimits>,
312}
313
314/// Per-client resource limits
315#[derive(Debug, Clone, Serialize, Deserialize)]
316pub struct ClientResourceLimits {
317    /// Max concurrent requests per client
318    pub max_concurrent_requests: usize,
319    /// Max GPU memory per client
320    pub max_gpu_memory: Option<u64>,
321    /// Max requests per minute
322    pub max_requests_per_minute: Option<u32>,
323}
324
325pub type SchedulerMetrics = SchedulerStats;
326
327/// Advanced scheduler capabilities
328#[async_trait]
329pub trait AdvancedScheduler: Scheduler {
330    /// Enable resource-aware scheduling
331    async fn enable_resource_awareness(&mut self, config: ResourceAwarenessConfig) -> Result<()>;
332
333    /// Set custom admission policy
334    async fn set_admission_policy(&mut self, policy: Box<dyn AdmissionPolicy>) -> Result<()>;
335
336    /// Configure dynamic batching
337    async fn configure_dynamic_batching(&mut self, config: DynamicBatchingConfig) -> Result<()>;
338
339    /// Get detailed queue analysis
340    fn queue_analysis(&self) -> QueueAnalysis;
341
342    /// Simulate scheduling for capacity planning
343    async fn simulate_load(
344        &self,
345        workload: &SimulatedWorkload,
346    ) -> Result<SchedulingSimulationResult>;
347}
348
349/// Resource awareness configuration
350#[derive(Debug, Clone, Serialize, Deserialize)]
351pub struct ResourceAwarenessConfig {
352    /// Enable memory-aware scheduling
353    pub enable_memory_awareness: bool,
354    /// Enable compute-aware scheduling
355    pub enable_compute_awareness: bool,
356    /// Resource prediction horizon
357    pub prediction_horizon_ms: u64,
358    /// Resource safety margin (0.0 - 1.0)
359    pub safety_margin: f32,
360}
361
362/// Admission policy for request acceptance
363pub trait AdmissionPolicy: Send + Sync {
364    /// Decide whether to admit a request
365    fn should_admit(
366        &self,
367        request: &InferenceRequest,
368        current_metrics: &SchedulerMetrics,
369    ) -> AdmissionDecision;
370
371    /// Get policy name
372    fn name(&self) -> &str;
373}
374
375/// Admission decision
376#[derive(Debug, Clone)]
377pub enum AdmissionDecision {
378    /// Accept the request
379    Accept,
380    /// Reject the request with reason
381    Reject(String),
382    /// Accept but suggest delay
383    AcceptWithDelay(Duration),
384}
385
386/// Dynamic batching configuration
387#[derive(Debug, Clone, Serialize, Deserialize)]
388pub struct DynamicBatchingConfig {
389    /// Minimum batch size
390    pub min_batch_size: usize,
391    /// Maximum batch size
392    pub max_batch_size: usize,
393    /// Batch formation timeout
394    pub batch_timeout_ms: u64,
395    /// Enable adaptive batch sizing
396    pub enable_adaptive_sizing: bool,
397    /// Target batch utilization
398    pub target_utilization: f32,
399}
400
401/// Queue analysis results
402#[derive(Debug, Clone)]
403pub struct QueueAnalysis {
404    /// Queue depth over time
405    pub queue_depth_history: Vec<(chrono::DateTime<chrono::Utc>, usize)>,
406    /// Wait time distribution
407    pub wait_time_distribution: WaitTimeDistribution,
408    /// Request pattern analysis
409    pub request_patterns: RequestPatternAnalysis,
410    /// Bottleneck identification
411    pub bottlenecks: Vec<BottleneckAnalysis>,
412}
413
414/// Wait time distribution
415#[derive(Debug, Clone)]
416pub struct WaitTimeDistribution {
417    /// P50 wait time
418    pub p50_ms: f64,
419    /// P95 wait time
420    pub p95_ms: f64,
421    /// P99 wait time
422    pub p99_ms: f64,
423    /// Maximum wait time
424    pub max_ms: f64,
425    /// Average wait time
426    pub mean_ms: f64,
427}
428
429/// Request pattern analysis
430#[derive(Debug, Clone)]
431pub struct RequestPatternAnalysis {
432    /// Peak request times
433    pub peak_times: Vec<chrono::DateTime<chrono::Utc>>,
434    /// Request rate trend
435    pub rate_trend: RateTrend,
436    /// Seasonality patterns
437    pub seasonality: SeasonalityPattern,
438}
439
440/// Request rate trend
441#[derive(Debug, Clone, Copy)]
442pub enum RateTrend {
443    Increasing,
444    Decreasing,
445    Stable,
446    Volatile,
447}
448
449/// Seasonality patterns
450#[derive(Debug, Clone)]
451pub struct SeasonalityPattern {
452    /// Hourly patterns
453    pub hourly_pattern: Vec<f32>,
454    /// Daily patterns  
455    pub daily_pattern: Vec<f32>,
456    /// Weekly patterns
457    pub weekly_pattern: Vec<f32>,
458}
459
460/// Bottleneck analysis
461#[derive(Debug, Clone)]
462pub struct BottleneckAnalysis {
463    /// Bottleneck type
464    pub bottleneck_type: BottleneckType,
465    /// Severity (0.0 - 1.0)
466    pub severity: f32,
467    /// Description
468    pub description: String,
469    /// Suggested mitigation
470    pub mitigation: String,
471}
472
473/// Types of bottlenecks
474#[derive(Debug, Clone, Copy)]
475pub enum BottleneckType {
476    /// Memory bottleneck
477    Memory,
478    /// Compute bottleneck
479    Compute,
480    /// I/O bottleneck
481    IO,
482    /// Scheduling bottleneck
483    Scheduling,
484    /// Network bottleneck
485    Network,
486}
487
488/// Simulated workload for capacity planning
489#[derive(Debug, Clone)]
490pub struct SimulatedWorkload {
491    /// Request arrival pattern
492    pub arrival_pattern: ArrivalPattern,
493    /// Request size distribution
494    pub size_distribution: SizeDistribution,
495    /// Simulation duration
496    pub duration_seconds: u64,
497}
498
499/// Request arrival patterns
500#[derive(Debug, Clone)]
501pub enum ArrivalPattern {
502    /// Constant rate
503    Constant { rate_rps: f32 },
504    /// Poisson process
505    Poisson { lambda: f32 },
506    /// Bursty pattern
507    Bursty {
508        burst_rate: f32,
509        quiet_rate: f32,
510        burst_duration_s: f32,
511    },
512    /// Seasonal pattern
513    Seasonal {
514        base_rate: f32,
515        peaks: Vec<(f32, f32)>,
516    }, // (time, multiplier)
517}
518
519/// Request size distribution
520#[derive(Debug, Clone)]
521pub enum SizeDistribution {
522    /// Fixed size
523    Fixed { tokens: usize },
524    /// Uniform distribution
525    Uniform {
526        min_tokens: usize,
527        max_tokens: usize,
528    },
529    /// Normal distribution
530    Normal { mean: f32, std_dev: f32 },
531    /// Log-normal distribution
532    LogNormal { mu: f32, sigma: f32 },
533}
534
535/// Scheduling simulation results
536#[derive(Debug, Clone)]
537pub struct SchedulingSimulationResult {
538    /// Total requests processed
539    pub total_requests: u64,
540    /// Successful requests
541    pub successful_requests: u64,
542    /// Failed/rejected requests
543    pub failed_requests: u64,
544    /// Average latency
545    pub avg_latency_ms: f64,
546    /// P95 latency
547    pub p95_latency_ms: f64,
548    /// P99 latency
549    pub p99_latency_ms: f64,
550    /// Throughput achieved
551    pub throughput_rps: f32,
552    /// Resource utilization (optional placeholder)
553    pub resource_utilization: Option<ResourceStats>,
554    /// Predicted bottlenecks
555    pub bottlenecks: Vec<BottleneckAnalysis>,
556}
557
558#[derive(Debug, Clone, Serialize, Deserialize, Default)]
559pub struct ResourceStats {
560    pub gpu_memory_bytes: Option<u64>,
561    pub cpu_memory_bytes: Option<u64>,
562    pub compute_utilization: Option<f32>,
563}