Skip to main content

heliosdb_proxy/distribcache/
scheduler.rs

1//! Workload scheduler for cache resource allocation
2//!
3//! Schedules cache operations based on workload type and priority.
4//! Supports multiple scheduling policies.
5
6use chrono::Timelike;
7use dashmap::DashMap;
8use std::collections::HashMap;
9use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
10use std::sync::RwLock;
11
12use super::DistribCacheConfig;
13use super::classifier::WorkloadType;
14use super::config::SchedulingPolicy;
15
16/// Scheduled query
17#[derive(Debug, Clone)]
18pub struct ScheduledQuery {
19    /// Query identifier
20    pub id: u64,
21    /// Workload type
22    pub workload_type: WorkloadType,
23    /// Request timestamp
24    pub timestamp: std::time::Instant,
25}
26
27/// Schedule result
28#[derive(Debug, Clone)]
29pub enum ScheduleResult {
30    /// Execute immediately
31    Execute { priority: QueryPriority },
32    /// Queue for later execution
33    Queued { position: usize },
34    /// Reject due to resource constraints
35    Rejected { reason: String },
36}
37
38/// Query priority
39#[derive(Debug, Clone, Copy, PartialEq, Eq)]
40pub enum QueryPriority {
41    High,
42    Normal,
43    Low,
44}
45
46/// Workload distribution snapshot
47#[derive(Debug, Clone)]
48pub struct WorkloadDistribution {
49    /// OLTP percentage
50    pub oltp: WorkloadSlot,
51    /// OLAP percentage
52    pub olap: WorkloadSlot,
53    /// Vector percentage
54    pub vector: WorkloadSlot,
55    /// AI Agent percentage
56    pub ai_agent: WorkloadSlot,
57    /// RAG percentage
58    pub rag: WorkloadSlot,
59}
60
61/// Workload slot information
62#[derive(Debug, Clone)]
63pub struct WorkloadSlot {
64    /// Current percentage
65    pub current_pct: f64,
66    /// Target percentage
67    pub target_pct: f64,
68    /// Currently queued
69    pub queued: u32,
70    /// Currently active
71    pub active: u32,
72}
73
74/// Per-workload queue
75struct WorkloadQueue {
76    /// Pending queries
77    pending: std::collections::VecDeque<ScheduledQuery>,
78    /// Active count
79    active: AtomicU32,
80    /// Total processed
81    total_processed: AtomicU64,
82}
83
84impl WorkloadQueue {
85    fn new() -> Self {
86        Self {
87            pending: std::collections::VecDeque::new(),
88            active: AtomicU32::new(0),
89            total_processed: AtomicU64::new(0),
90        }
91    }
92}
93
94/// Workload scheduler
95pub struct WorkloadScheduler {
96    /// Configuration
97    config: DistribCacheConfig,
98
99    /// Queues per workload type
100    queues: DashMap<WorkloadType, RwLock<WorkloadQueue>>,
101
102    /// Resource limits per workload
103    limits: HashMap<WorkloadType, ResourceLimit>,
104
105    /// Scheduling policy
106    policy: SchedulingPolicy,
107
108    /// Statistics
109    stats: SchedulerStats,
110}
111
112/// Resource limits for a workload
113#[derive(Debug, Clone)]
114pub struct ResourceLimit {
115    /// Maximum concurrent queries
116    pub max_concurrent: u32,
117    /// Maximum cache memory in MB
118    pub max_cache_mb: usize,
119    /// Priority weight (0.0 - 1.0)
120    pub priority_weight: f64,
121}
122
123impl Default for ResourceLimit {
124    fn default() -> Self {
125        Self {
126            max_concurrent: 100,
127            max_cache_mb: 64,
128            priority_weight: 0.5,
129        }
130    }
131}
132
133/// Scheduler statistics
134#[derive(Debug, Default)]
135struct SchedulerStats {
136    total_scheduled: AtomicU64,
137    total_queued: AtomicU64,
138    total_rejected: AtomicU64,
139    current_active: AtomicU32,
140}
141
142impl WorkloadScheduler {
143    /// Create a new scheduler
144    pub fn new(config: DistribCacheConfig) -> Self {
145        let mut limits = HashMap::new();
146
147        limits.insert(WorkloadType::OLTP, ResourceLimit {
148            max_concurrent: config.max_concurrent_oltp,
149            max_cache_mb: 64,
150            priority_weight: config.oltp_priority,
151        });
152
153        limits.insert(WorkloadType::OLAP, ResourceLimit {
154            max_concurrent: config.max_concurrent_olap,
155            max_cache_mb: 128,
156            priority_weight: config.olap_priority,
157        });
158
159        limits.insert(WorkloadType::Vector, ResourceLimit {
160            max_concurrent: config.max_concurrent_vector,
161            max_cache_mb: 96,
162            priority_weight: config.vector_priority,
163        });
164
165        limits.insert(WorkloadType::AIAgent, ResourceLimit {
166            max_concurrent: config.max_concurrent_ai,
167            max_cache_mb: 64,
168            priority_weight: config.ai_agent_priority,
169        });
170
171        limits.insert(WorkloadType::RAG, ResourceLimit {
172            max_concurrent: config.max_concurrent_ai,
173            max_cache_mb: 64,
174            priority_weight: config.ai_agent_priority,
175        });
176
177        limits.insert(WorkloadType::Mixed, ResourceLimit::default());
178
179        let queues = DashMap::new();
180        for wt in [
181            WorkloadType::OLTP,
182            WorkloadType::OLAP,
183            WorkloadType::Vector,
184            WorkloadType::AIAgent,
185            WorkloadType::RAG,
186            WorkloadType::Mixed,
187        ] {
188            queues.insert(wt, RwLock::new(WorkloadQueue::new()));
189        }
190
191        Self {
192            policy: config.scheduling_policy,
193            config,
194            queues,
195            limits,
196            stats: SchedulerStats::default(),
197        }
198    }
199
200    /// Schedule a query
201    pub fn schedule(&self, query: ScheduledQuery) -> ScheduleResult {
202        self.stats.total_scheduled.fetch_add(1, Ordering::Relaxed);
203
204        let workload = query.workload_type;
205        let default_limit = ResourceLimit::default();
206        let limit = self.limits.get(&workload).unwrap_or(&default_limit);
207
208        // Check current concurrency
209        let current = self.get_current_concurrency(&workload);
210        if current >= limit.max_concurrent {
211            // Queue the request
212            self.enqueue(query.clone());
213            self.stats.total_queued.fetch_add(1, Ordering::Relaxed);
214            return ScheduleResult::Queued {
215                position: self.queue_position(&workload),
216            };
217        }
218
219        // Apply scheduling policy
220        match self.policy {
221            SchedulingPolicy::StrictPriority => self.schedule_strict_priority(query),
222            SchedulingPolicy::WeightedFair => self.schedule_weighted_fair(query),
223            SchedulingPolicy::TimeBased => self.schedule_time_based(query),
224            SchedulingPolicy::Adaptive => self.schedule_adaptive(query),
225        }
226    }
227
228    /// Strict priority scheduling (OLTP always first)
229    fn schedule_strict_priority(&self, query: ScheduledQuery) -> ScheduleResult {
230        let priority = match query.workload_type {
231            WorkloadType::OLTP => QueryPriority::High,
232            WorkloadType::AIAgent | WorkloadType::RAG => QueryPriority::Normal,
233            WorkloadType::Vector => QueryPriority::Normal,
234            WorkloadType::OLAP => QueryPriority::Low,
235            WorkloadType::Mixed => QueryPriority::Normal,
236        };
237
238        self.mark_active(&query.workload_type);
239        ScheduleResult::Execute { priority }
240    }
241
242    /// Weighted fair scheduling
243    fn schedule_weighted_fair(&self, query: ScheduledQuery) -> ScheduleResult {
244        let limit = self.limits.get(&query.workload_type).unwrap();
245        let weight = limit.priority_weight;
246
247        let priority = if weight >= 0.8 {
248            QueryPriority::High
249        } else if weight >= 0.4 {
250            QueryPriority::Normal
251        } else {
252            QueryPriority::Low
253        };
254
255        self.mark_active(&query.workload_type);
256        ScheduleResult::Execute { priority }
257    }
258
259    /// Time-based scheduling
260    fn schedule_time_based(&self, query: ScheduledQuery) -> ScheduleResult {
261        let hour = chrono::Utc::now().hour();
262
263        // Business hours (9-18): prioritize OLTP
264        let priority = if hour >= 9 && hour < 18 {
265            match query.workload_type {
266                WorkloadType::OLTP | WorkloadType::AIAgent => QueryPriority::High,
267                WorkloadType::OLAP => QueryPriority::Low,
268                _ => QueryPriority::Normal,
269            }
270        } else {
271            // Off-hours: prioritize OLAP
272            match query.workload_type {
273                WorkloadType::OLAP => QueryPriority::High,
274                WorkloadType::OLTP => QueryPriority::Normal,
275                _ => QueryPriority::Normal,
276            }
277        };
278
279        self.mark_active(&query.workload_type);
280        ScheduleResult::Execute { priority }
281    }
282
283    /// Adaptive scheduling (learns optimal distribution)
284    fn schedule_adaptive(&self, query: ScheduledQuery) -> ScheduleResult {
285        // Get current and ideal distribution
286        let distribution = self.get_distribution();
287        let workload = query.workload_type;
288
289        let slot = match workload {
290            WorkloadType::OLTP => &distribution.oltp,
291            WorkloadType::OLAP => &distribution.olap,
292            WorkloadType::Vector => &distribution.vector,
293            WorkloadType::AIAgent => &distribution.ai_agent,
294            WorkloadType::RAG => &distribution.rag,
295            WorkloadType::Mixed => &distribution.oltp, // Default to OLTP behavior
296        };
297
298        let priority = if slot.current_pct < slot.target_pct {
299            QueryPriority::High // Below target, prioritize
300        } else if slot.current_pct > slot.target_pct * 1.2 {
301            QueryPriority::Low // Above target, deprioritize
302        } else {
303            QueryPriority::Normal
304        };
305
306        self.mark_active(&query.workload_type);
307        ScheduleResult::Execute { priority }
308    }
309
310    /// Get current concurrency for a workload
311    fn get_current_concurrency(&self, workload: &WorkloadType) -> u32 {
312        self.queues.get(workload)
313            .map(|q| q.read().unwrap().active.load(Ordering::Relaxed))
314            .unwrap_or(0)
315    }
316
317    /// Get queue position
318    fn queue_position(&self, workload: &WorkloadType) -> usize {
319        self.queues.get(workload)
320            .map(|q| q.read().unwrap().pending.len())
321            .unwrap_or(0)
322    }
323
324    /// Enqueue a query
325    fn enqueue(&self, query: ScheduledQuery) {
326        if let Some(queue) = self.queues.get(&query.workload_type) {
327            queue.write().unwrap().pending.push_back(query);
328        }
329    }
330
331    /// Mark a query as active
332    fn mark_active(&self, workload: &WorkloadType) {
333        if let Some(queue) = self.queues.get(workload) {
334            queue.read().unwrap().active.fetch_add(1, Ordering::Relaxed);
335        }
336        self.stats.current_active.fetch_add(1, Ordering::Relaxed);
337    }
338
339    /// Mark a query as complete
340    pub fn mark_complete(&self, workload: WorkloadType) {
341        if let Some(queue) = self.queues.get(&workload) {
342            let q = queue.read().unwrap();
343            q.active.fetch_sub(1, Ordering::Relaxed);
344            q.total_processed.fetch_add(1, Ordering::Relaxed);
345        }
346        self.stats.current_active.fetch_sub(1, Ordering::Relaxed);
347    }
348
349    /// Get workload distribution
350    pub fn get_distribution(&self) -> WorkloadDistribution {
351        let total_active = self.stats.current_active.load(Ordering::Relaxed) as f64;
352
353        let get_slot = |wt: WorkloadType| -> WorkloadSlot {
354            let queue = self.queues.get(&wt).unwrap();
355            let q = queue.read().unwrap();
356            let active = q.active.load(Ordering::Relaxed);
357            let limit = self.limits.get(&wt).unwrap();
358
359            WorkloadSlot {
360                current_pct: if total_active > 0.0 {
361                    active as f64 / total_active * 100.0
362                } else {
363                    0.0
364                },
365                target_pct: limit.priority_weight * 100.0 / 2.5, // Normalize
366                queued: q.pending.len() as u32,
367                active,
368            }
369        };
370
371        WorkloadDistribution {
372            oltp: get_slot(WorkloadType::OLTP),
373            olap: get_slot(WorkloadType::OLAP),
374            vector: get_slot(WorkloadType::Vector),
375            ai_agent: get_slot(WorkloadType::AIAgent),
376            rag: get_slot(WorkloadType::RAG),
377        }
378    }
379
380    /// Get scheduler statistics
381    pub fn stats(&self) -> SchedulerStatsSnapshot {
382        SchedulerStatsSnapshot {
383            total_scheduled: self.stats.total_scheduled.load(Ordering::Relaxed),
384            total_queued: self.stats.total_queued.load(Ordering::Relaxed),
385            total_rejected: self.stats.total_rejected.load(Ordering::Relaxed),
386            current_active: self.stats.current_active.load(Ordering::Relaxed),
387            policy: self.policy,
388        }
389    }
390}
391
392/// Scheduler statistics snapshot
393#[derive(Debug, Clone)]
394pub struct SchedulerStatsSnapshot {
395    pub total_scheduled: u64,
396    pub total_queued: u64,
397    pub total_rejected: u64,
398    pub current_active: u32,
399    pub policy: SchedulingPolicy,
400}
401
402#[cfg(test)]
403mod tests {
404    use super::*;
405
406    #[test]
407    fn test_schedule_oltp() {
408        let config = DistribCacheConfig::default();
409        let scheduler = WorkloadScheduler::new(config);
410
411        let query = ScheduledQuery {
412            id: 1,
413            workload_type: WorkloadType::OLTP,
414            timestamp: std::time::Instant::now(),
415        };
416
417        let result = scheduler.schedule(query);
418        assert!(matches!(result, ScheduleResult::Execute { .. }));
419    }
420
421    #[test]
422    fn test_schedule_with_concurrency_limit() {
423        let mut config = DistribCacheConfig::default();
424        config.max_concurrent_oltp = 1;
425
426        let scheduler = WorkloadScheduler::new(config);
427
428        // First query should execute
429        let query1 = ScheduledQuery {
430            id: 1,
431            workload_type: WorkloadType::OLTP,
432            timestamp: std::time::Instant::now(),
433        };
434        let result1 = scheduler.schedule(query1);
435        assert!(matches!(result1, ScheduleResult::Execute { .. }));
436
437        // Second query should be queued (max concurrent = 1)
438        let query2 = ScheduledQuery {
439            id: 2,
440            workload_type: WorkloadType::OLTP,
441            timestamp: std::time::Instant::now(),
442        };
443        let result2 = scheduler.schedule(query2);
444        assert!(matches!(result2, ScheduleResult::Queued { .. }));
445    }
446
447    #[test]
448    fn test_mark_complete() {
449        let config = DistribCacheConfig::default();
450        let scheduler = WorkloadScheduler::new(config);
451
452        let query = ScheduledQuery {
453            id: 1,
454            workload_type: WorkloadType::OLTP,
455            timestamp: std::time::Instant::now(),
456        };
457
458        scheduler.schedule(query);
459        assert_eq!(scheduler.stats().current_active, 1);
460
461        scheduler.mark_complete(WorkloadType::OLTP);
462        assert_eq!(scheduler.stats().current_active, 0);
463    }
464
465    #[test]
466    fn test_get_distribution() {
467        let config = DistribCacheConfig::default();
468        let scheduler = WorkloadScheduler::new(config);
469
470        // Schedule some queries
471        for i in 0..5 {
472            let query = ScheduledQuery {
473                id: i,
474                workload_type: WorkloadType::OLTP,
475                timestamp: std::time::Instant::now(),
476            };
477            scheduler.schedule(query);
478        }
479
480        for i in 0..3 {
481            let query = ScheduledQuery {
482                id: i + 10,
483                workload_type: WorkloadType::OLAP,
484                timestamp: std::time::Instant::now(),
485            };
486            scheduler.schedule(query);
487        }
488
489        let dist = scheduler.get_distribution();
490        assert!(dist.oltp.active > 0);
491        assert!(dist.olap.active > 0);
492    }
493}