1use chrono::Timelike;
7use dashmap::DashMap;
8use std::collections::HashMap;
9use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
10use std::sync::RwLock;
11
12use super::DistribCacheConfig;
13use super::classifier::WorkloadType;
14use super::config::SchedulingPolicy;
15
16#[derive(Debug, Clone)]
18pub struct ScheduledQuery {
19 pub id: u64,
21 pub workload_type: WorkloadType,
23 pub timestamp: std::time::Instant,
25}
26
27#[derive(Debug, Clone)]
29pub enum ScheduleResult {
30 Execute { priority: QueryPriority },
32 Queued { position: usize },
34 Rejected { reason: String },
36}
37
38#[derive(Debug, Clone, Copy, PartialEq, Eq)]
40pub enum QueryPriority {
41 High,
42 Normal,
43 Low,
44}
45
46#[derive(Debug, Clone)]
48pub struct WorkloadDistribution {
49 pub oltp: WorkloadSlot,
51 pub olap: WorkloadSlot,
53 pub vector: WorkloadSlot,
55 pub ai_agent: WorkloadSlot,
57 pub rag: WorkloadSlot,
59}
60
61#[derive(Debug, Clone)]
63pub struct WorkloadSlot {
64 pub current_pct: f64,
66 pub target_pct: f64,
68 pub queued: u32,
70 pub active: u32,
72}
73
74struct WorkloadQueue {
76 pending: std::collections::VecDeque<ScheduledQuery>,
78 active: AtomicU32,
80 total_processed: AtomicU64,
82}
83
84impl WorkloadQueue {
85 fn new() -> Self {
86 Self {
87 pending: std::collections::VecDeque::new(),
88 active: AtomicU32::new(0),
89 total_processed: AtomicU64::new(0),
90 }
91 }
92}
93
94pub struct WorkloadScheduler {
96 config: DistribCacheConfig,
98
99 queues: DashMap<WorkloadType, RwLock<WorkloadQueue>>,
101
102 limits: HashMap<WorkloadType, ResourceLimit>,
104
105 policy: SchedulingPolicy,
107
108 stats: SchedulerStats,
110}
111
112#[derive(Debug, Clone)]
114pub struct ResourceLimit {
115 pub max_concurrent: u32,
117 pub max_cache_mb: usize,
119 pub priority_weight: f64,
121}
122
123impl Default for ResourceLimit {
124 fn default() -> Self {
125 Self {
126 max_concurrent: 100,
127 max_cache_mb: 64,
128 priority_weight: 0.5,
129 }
130 }
131}
132
133#[derive(Debug, Default)]
135struct SchedulerStats {
136 total_scheduled: AtomicU64,
137 total_queued: AtomicU64,
138 total_rejected: AtomicU64,
139 current_active: AtomicU32,
140}
141
142impl WorkloadScheduler {
143 pub fn new(config: DistribCacheConfig) -> Self {
145 let mut limits = HashMap::new();
146
147 limits.insert(WorkloadType::OLTP, ResourceLimit {
148 max_concurrent: config.max_concurrent_oltp,
149 max_cache_mb: 64,
150 priority_weight: config.oltp_priority,
151 });
152
153 limits.insert(WorkloadType::OLAP, ResourceLimit {
154 max_concurrent: config.max_concurrent_olap,
155 max_cache_mb: 128,
156 priority_weight: config.olap_priority,
157 });
158
159 limits.insert(WorkloadType::Vector, ResourceLimit {
160 max_concurrent: config.max_concurrent_vector,
161 max_cache_mb: 96,
162 priority_weight: config.vector_priority,
163 });
164
165 limits.insert(WorkloadType::AIAgent, ResourceLimit {
166 max_concurrent: config.max_concurrent_ai,
167 max_cache_mb: 64,
168 priority_weight: config.ai_agent_priority,
169 });
170
171 limits.insert(WorkloadType::RAG, ResourceLimit {
172 max_concurrent: config.max_concurrent_ai,
173 max_cache_mb: 64,
174 priority_weight: config.ai_agent_priority,
175 });
176
177 limits.insert(WorkloadType::Mixed, ResourceLimit::default());
178
179 let queues = DashMap::new();
180 for wt in [
181 WorkloadType::OLTP,
182 WorkloadType::OLAP,
183 WorkloadType::Vector,
184 WorkloadType::AIAgent,
185 WorkloadType::RAG,
186 WorkloadType::Mixed,
187 ] {
188 queues.insert(wt, RwLock::new(WorkloadQueue::new()));
189 }
190
191 Self {
192 policy: config.scheduling_policy,
193 config,
194 queues,
195 limits,
196 stats: SchedulerStats::default(),
197 }
198 }
199
200 pub fn schedule(&self, query: ScheduledQuery) -> ScheduleResult {
202 self.stats.total_scheduled.fetch_add(1, Ordering::Relaxed);
203
204 let workload = query.workload_type;
205 let default_limit = ResourceLimit::default();
206 let limit = self.limits.get(&workload).unwrap_or(&default_limit);
207
208 let current = self.get_current_concurrency(&workload);
210 if current >= limit.max_concurrent {
211 self.enqueue(query.clone());
213 self.stats.total_queued.fetch_add(1, Ordering::Relaxed);
214 return ScheduleResult::Queued {
215 position: self.queue_position(&workload),
216 };
217 }
218
219 match self.policy {
221 SchedulingPolicy::StrictPriority => self.schedule_strict_priority(query),
222 SchedulingPolicy::WeightedFair => self.schedule_weighted_fair(query),
223 SchedulingPolicy::TimeBased => self.schedule_time_based(query),
224 SchedulingPolicy::Adaptive => self.schedule_adaptive(query),
225 }
226 }
227
228 fn schedule_strict_priority(&self, query: ScheduledQuery) -> ScheduleResult {
230 let priority = match query.workload_type {
231 WorkloadType::OLTP => QueryPriority::High,
232 WorkloadType::AIAgent | WorkloadType::RAG => QueryPriority::Normal,
233 WorkloadType::Vector => QueryPriority::Normal,
234 WorkloadType::OLAP => QueryPriority::Low,
235 WorkloadType::Mixed => QueryPriority::Normal,
236 };
237
238 self.mark_active(&query.workload_type);
239 ScheduleResult::Execute { priority }
240 }
241
242 fn schedule_weighted_fair(&self, query: ScheduledQuery) -> ScheduleResult {
244 let limit = self.limits.get(&query.workload_type).unwrap();
245 let weight = limit.priority_weight;
246
247 let priority = if weight >= 0.8 {
248 QueryPriority::High
249 } else if weight >= 0.4 {
250 QueryPriority::Normal
251 } else {
252 QueryPriority::Low
253 };
254
255 self.mark_active(&query.workload_type);
256 ScheduleResult::Execute { priority }
257 }
258
259 fn schedule_time_based(&self, query: ScheduledQuery) -> ScheduleResult {
261 let hour = chrono::Utc::now().hour();
262
263 let priority = if hour >= 9 && hour < 18 {
265 match query.workload_type {
266 WorkloadType::OLTP | WorkloadType::AIAgent => QueryPriority::High,
267 WorkloadType::OLAP => QueryPriority::Low,
268 _ => QueryPriority::Normal,
269 }
270 } else {
271 match query.workload_type {
273 WorkloadType::OLAP => QueryPriority::High,
274 WorkloadType::OLTP => QueryPriority::Normal,
275 _ => QueryPriority::Normal,
276 }
277 };
278
279 self.mark_active(&query.workload_type);
280 ScheduleResult::Execute { priority }
281 }
282
283 fn schedule_adaptive(&self, query: ScheduledQuery) -> ScheduleResult {
285 let distribution = self.get_distribution();
287 let workload = query.workload_type;
288
289 let slot = match workload {
290 WorkloadType::OLTP => &distribution.oltp,
291 WorkloadType::OLAP => &distribution.olap,
292 WorkloadType::Vector => &distribution.vector,
293 WorkloadType::AIAgent => &distribution.ai_agent,
294 WorkloadType::RAG => &distribution.rag,
295 WorkloadType::Mixed => &distribution.oltp, };
297
298 let priority = if slot.current_pct < slot.target_pct {
299 QueryPriority::High } else if slot.current_pct > slot.target_pct * 1.2 {
301 QueryPriority::Low } else {
303 QueryPriority::Normal
304 };
305
306 self.mark_active(&query.workload_type);
307 ScheduleResult::Execute { priority }
308 }
309
310 fn get_current_concurrency(&self, workload: &WorkloadType) -> u32 {
312 self.queues.get(workload)
313 .map(|q| q.read().unwrap().active.load(Ordering::Relaxed))
314 .unwrap_or(0)
315 }
316
317 fn queue_position(&self, workload: &WorkloadType) -> usize {
319 self.queues.get(workload)
320 .map(|q| q.read().unwrap().pending.len())
321 .unwrap_or(0)
322 }
323
324 fn enqueue(&self, query: ScheduledQuery) {
326 if let Some(queue) = self.queues.get(&query.workload_type) {
327 queue.write().unwrap().pending.push_back(query);
328 }
329 }
330
331 fn mark_active(&self, workload: &WorkloadType) {
333 if let Some(queue) = self.queues.get(workload) {
334 queue.read().unwrap().active.fetch_add(1, Ordering::Relaxed);
335 }
336 self.stats.current_active.fetch_add(1, Ordering::Relaxed);
337 }
338
339 pub fn mark_complete(&self, workload: WorkloadType) {
341 if let Some(queue) = self.queues.get(&workload) {
342 let q = queue.read().unwrap();
343 q.active.fetch_sub(1, Ordering::Relaxed);
344 q.total_processed.fetch_add(1, Ordering::Relaxed);
345 }
346 self.stats.current_active.fetch_sub(1, Ordering::Relaxed);
347 }
348
349 pub fn get_distribution(&self) -> WorkloadDistribution {
351 let total_active = self.stats.current_active.load(Ordering::Relaxed) as f64;
352
353 let get_slot = |wt: WorkloadType| -> WorkloadSlot {
354 let queue = self.queues.get(&wt).unwrap();
355 let q = queue.read().unwrap();
356 let active = q.active.load(Ordering::Relaxed);
357 let limit = self.limits.get(&wt).unwrap();
358
359 WorkloadSlot {
360 current_pct: if total_active > 0.0 {
361 active as f64 / total_active * 100.0
362 } else {
363 0.0
364 },
365 target_pct: limit.priority_weight * 100.0 / 2.5, queued: q.pending.len() as u32,
367 active,
368 }
369 };
370
371 WorkloadDistribution {
372 oltp: get_slot(WorkloadType::OLTP),
373 olap: get_slot(WorkloadType::OLAP),
374 vector: get_slot(WorkloadType::Vector),
375 ai_agent: get_slot(WorkloadType::AIAgent),
376 rag: get_slot(WorkloadType::RAG),
377 }
378 }
379
380 pub fn stats(&self) -> SchedulerStatsSnapshot {
382 SchedulerStatsSnapshot {
383 total_scheduled: self.stats.total_scheduled.load(Ordering::Relaxed),
384 total_queued: self.stats.total_queued.load(Ordering::Relaxed),
385 total_rejected: self.stats.total_rejected.load(Ordering::Relaxed),
386 current_active: self.stats.current_active.load(Ordering::Relaxed),
387 policy: self.policy,
388 }
389 }
390}
391
392#[derive(Debug, Clone)]
394pub struct SchedulerStatsSnapshot {
395 pub total_scheduled: u64,
396 pub total_queued: u64,
397 pub total_rejected: u64,
398 pub current_active: u32,
399 pub policy: SchedulingPolicy,
400}
401
402#[cfg(test)]
403mod tests {
404 use super::*;
405
406 #[test]
407 fn test_schedule_oltp() {
408 let config = DistribCacheConfig::default();
409 let scheduler = WorkloadScheduler::new(config);
410
411 let query = ScheduledQuery {
412 id: 1,
413 workload_type: WorkloadType::OLTP,
414 timestamp: std::time::Instant::now(),
415 };
416
417 let result = scheduler.schedule(query);
418 assert!(matches!(result, ScheduleResult::Execute { .. }));
419 }
420
421 #[test]
422 fn test_schedule_with_concurrency_limit() {
423 let mut config = DistribCacheConfig::default();
424 config.max_concurrent_oltp = 1;
425
426 let scheduler = WorkloadScheduler::new(config);
427
428 let query1 = ScheduledQuery {
430 id: 1,
431 workload_type: WorkloadType::OLTP,
432 timestamp: std::time::Instant::now(),
433 };
434 let result1 = scheduler.schedule(query1);
435 assert!(matches!(result1, ScheduleResult::Execute { .. }));
436
437 let query2 = ScheduledQuery {
439 id: 2,
440 workload_type: WorkloadType::OLTP,
441 timestamp: std::time::Instant::now(),
442 };
443 let result2 = scheduler.schedule(query2);
444 assert!(matches!(result2, ScheduleResult::Queued { .. }));
445 }
446
447 #[test]
448 fn test_mark_complete() {
449 let config = DistribCacheConfig::default();
450 let scheduler = WorkloadScheduler::new(config);
451
452 let query = ScheduledQuery {
453 id: 1,
454 workload_type: WorkloadType::OLTP,
455 timestamp: std::time::Instant::now(),
456 };
457
458 scheduler.schedule(query);
459 assert_eq!(scheduler.stats().current_active, 1);
460
461 scheduler.mark_complete(WorkloadType::OLTP);
462 assert_eq!(scheduler.stats().current_active, 0);
463 }
464
465 #[test]
466 fn test_get_distribution() {
467 let config = DistribCacheConfig::default();
468 let scheduler = WorkloadScheduler::new(config);
469
470 for i in 0..5 {
472 let query = ScheduledQuery {
473 id: i,
474 workload_type: WorkloadType::OLTP,
475 timestamp: std::time::Instant::now(),
476 };
477 scheduler.schedule(query);
478 }
479
480 for i in 0..3 {
481 let query = ScheduledQuery {
482 id: i + 10,
483 workload_type: WorkloadType::OLAP,
484 timestamp: std::time::Instant::now(),
485 };
486 scheduler.schedule(query);
487 }
488
489 let dist = scheduler.get_distribution();
490 assert!(dist.oltp.active > 0);
491 assert!(dist.olap.active > 0);
492 }
493}