1use crate::{
6 backend_traits::query_backend_capabilities, translation::HardwareBackend, CircuitResult,
7 DeviceError, DeviceResult,
8};
9use quantrs2_circuit::{optimization::analysis::CircuitAnalyzer, prelude::Circuit};
10use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
11use std::sync::{Arc, Mutex, RwLock};
12use std::time::{Duration, SystemTime};
13use tokio::sync::mpsc;
14#[cfg(not(feature = "scirs2"))]
16use super::fallback_scirs2::std as stats_std;
17#[cfg(not(feature = "scirs2"))]
18use super::fallback_scirs2::{mean, minimize, OptimizeResult};
19use super::types::*;
20#[cfg(feature = "scirs2")]
21use scirs2_stats::{mean, std as stats_std};
22
23impl QuantumJobScheduler {
24 pub fn new(params: SchedulingParams) -> Self {
26 let (event_sender, _) = mpsc::unbounded_channel();
27 Self {
28 params: Arc::new(RwLock::new(params)),
29 job_queues: Arc::new(Mutex::new(BTreeMap::new())),
30 jobs: Arc::new(RwLock::new(HashMap::new())),
31 backend_performance: Arc::new(RwLock::new(HashMap::new())),
32 backends: Arc::new(RwLock::new(HashSet::new())),
33 running_jobs: Arc::new(RwLock::new(HashMap::new())),
34 execution_history: Arc::new(RwLock::new(Vec::new())),
35 user_shares: Arc::new(RwLock::new(HashMap::new())),
36 scheduler_running: Arc::new(Mutex::new(false)),
37 event_sender,
38 performance_predictor: Arc::new(Mutex::new(PerformancePredictor::new())),
39 resource_manager: Arc::new(Mutex::new(ResourceManager::new())),
40 job_status_map: Arc::new(RwLock::new(HashMap::new())),
41 job_config_map: Arc::new(RwLock::new(HashMap::new())),
42 job_metrics_map: Arc::new(RwLock::new(HashMap::new())),
43 }
44 }
45 pub async fn register_backend(&self, backend: HardwareBackend) -> DeviceResult<()> {
47 let mut backends = self
48 .backends
49 .write()
50 .expect("Failed to acquire write lock on backends in register_backend");
51 backends.insert(backend);
52 let mut performance = self
53 .backend_performance
54 .write()
55 .expect("Failed to acquire write lock on backend_performance in register_backend");
56 performance.insert(
57 backend,
58 BackendPerformance {
59 backend,
60 queue_length: 0,
61 avg_queue_time: Duration::from_secs(0),
62 avg_execution_time: Duration::from_secs(0),
63 success_rate: 1.0,
64 utilization: 0.0,
65 avg_cost: None,
66 last_updated: SystemTime::now(),
67 history: VecDeque::new(),
68 },
69 );
70 let capabilities = query_backend_capabilities(backend);
71 let mut resource_manager = self
72 .resource_manager
73 .lock()
74 .expect("Failed to acquire lock on resource_manager in register_backend");
75 resource_manager.available_resources.insert(
76 backend,
77 ResourceCapacity {
78 qubits: capabilities.features.max_qubits,
79 max_circuit_depth: capabilities.features.max_depth,
80 memory_mb: 8192,
81 cpu_cores: 4,
82 concurrent_jobs: 10,
83 features: capabilities
84 .features
85 .supported_measurement_bases
86 .into_iter()
87 .collect(),
88 },
89 );
90 Ok(())
91 }
92 pub fn get_available_backends(&self) -> Vec<HardwareBackend> {
94 let backends = self
95 .backends
96 .read()
97 .expect("Failed to acquire read lock on backends in get_available_backends");
98 backends.iter().copied().collect()
99 }
100 pub async fn submit_job<const N: usize>(
102 &self,
103 circuit: Circuit<N>,
104 shots: usize,
105 config: JobConfig,
106 user_id: String,
107 ) -> DeviceResult<JobId> {
108 let job_id = JobId::new();
109 let now = SystemTime::now();
110 self.validate_job_config(&config).await?;
111 let estimated_duration = self
112 .estimate_execution_time(&circuit, shots, &config)
113 .await?;
114 let estimated_cost = self.estimate_cost(&circuit, shots, &config).await?;
115 let job = QuantumJob {
116 id: job_id.clone(),
117 config,
118 circuit,
119 shots,
120 submitted_at: now,
121 status: JobStatus::Pending,
122 execution_history: vec![],
123 metadata: HashMap::new(),
124 user_id: user_id.clone(),
125 group_id: None,
126 estimated_duration: Some(estimated_duration),
127 assigned_backend: None,
128 estimated_cost: Some(estimated_cost),
129 actual_cost: None,
130 };
131 let mut jobs = self
132 .jobs
133 .write()
134 .expect("Failed to acquire write lock on jobs in submit_job");
135 jobs.insert(job_id.clone(), Box::new(job.clone()));
136 drop(jobs);
137 {
138 let mut config_map = self
139 .job_config_map
140 .write()
141 .expect("Failed to acquire write lock on job_config_map in submit_job");
142 config_map.insert(job_id.clone(), job.config.clone());
143 }
144 let mut queues = self
145 .job_queues
146 .lock()
147 .expect("Failed to acquire lock on job_queues in submit_job");
148 let queue = queues.entry(job.config.priority).or_default();
149 queue.push_back(job_id.clone());
150 drop(queues);
151 self.update_user_share(&user_id, 1, 0).await;
152 let _ = self
153 .event_sender
154 .send(SchedulerEvent::JobSubmitted(job_id.clone()));
155 self.ensure_scheduler_running().await;
156 Ok(job_id)
157 }
158 pub async fn cancel_job(&self, job_id: &JobId) -> DeviceResult<bool> {
160 let mut queues = self
161 .job_queues
162 .lock()
163 .expect("Failed to acquire lock on job_queues in cancel_job");
164 for queue in queues.values_mut() {
165 if let Some(pos) = queue.iter().position(|id| id == job_id) {
166 queue.remove(pos);
167 drop(queues);
168 self.update_job_status(job_id, JobStatus::Cancelled).await?;
169 let _ = self
170 .event_sender
171 .send(SchedulerEvent::JobCancelled(job_id.clone()));
172 return Ok(true);
173 }
174 }
175 drop(queues);
176 let is_running = {
177 let running_jobs = self.running_jobs.read().map_err(|_| {
178 DeviceError::APIError("Lock poisoned on running_jobs in cancel_job".to_string())
179 })?;
180 running_jobs.contains_key(job_id)
181 };
182 if is_running {
183 self.update_job_status(job_id, JobStatus::Cancelled).await?;
184 let mut running_jobs = self.running_jobs.write().map_err(|_| {
185 DeviceError::APIError(
186 "Lock poisoned on running_jobs write in cancel_job".to_string(),
187 )
188 })?;
189 running_jobs.remove(job_id);
190 let _ = self
191 .event_sender
192 .send(SchedulerEvent::JobCancelled(job_id.clone()));
193 return Ok(true);
194 }
195 Ok(false)
196 }
197 pub async fn get_job_status<const N: usize>(
199 &self,
200 job_id: &JobId,
201 ) -> DeviceResult<Option<QuantumJob<N>>> {
202 let jobs = self
203 .jobs
204 .read()
205 .expect("Failed to acquire read lock on jobs in get_job_status");
206 if let Some(job_any) = jobs.get(job_id) {
207 if let Some(job) = job_any.downcast_ref::<QuantumJob<N>>() {
208 return Ok(Some(job.clone()));
209 }
210 }
211 Ok(None)
212 }
213 pub async fn get_queue_analytics(&self) -> DeviceResult<QueueAnalytics> {
215 let queues = self
216 .job_queues
217 .lock()
218 .expect("Failed to acquire lock on job_queues in get_queue_analytics");
219 let backend_performance = self
220 .backend_performance
221 .read()
222 .expect("Failed to acquire read lock on backend_performance in get_queue_analytics");
223 let total_queue_length = queues.values().map(|q| q.len()).sum();
224 let queue_by_priority = queues
225 .iter()
226 .map(|(priority, queue)| (*priority, queue.len()))
227 .collect();
228 let queue_by_backend = backend_performance
229 .iter()
230 .map(|(backend, perf)| (*backend, perf.queue_length))
231 .collect();
232 let predicted_queue_times = self.predict_queue_times(&backend_performance).await;
233 let system_load = self.calculate_system_load(&backend_performance).await;
234 let throughput = self.calculate_throughput().await;
235 let avg_wait_time = self.calculate_average_wait_time().await;
236 Ok(QueueAnalytics {
237 total_queue_length,
238 queue_by_priority,
239 queue_by_backend,
240 predicted_queue_times,
241 system_load,
242 throughput,
243 avg_wait_time,
244 })
245 }
246 pub async fn start_scheduler(&self) -> DeviceResult<()> {
248 let mut running = self
249 .scheduler_running
250 .lock()
251 .expect("Failed to acquire lock on scheduler_running in start_scheduler");
252 if *running {
253 return Err(DeviceError::APIError(
254 "Scheduler already running".to_string(),
255 ));
256 }
257 *running = true;
258 drop(running);
259 let scheduler = Arc::new(self.clone());
260 tokio::spawn(async move {
261 scheduler.scheduling_loop().await;
262 });
263 let scheduler = Arc::new(self.clone());
264 tokio::spawn(async move {
265 scheduler.performance_monitoring_loop().await;
266 });
267 let params = self
268 .params
269 .read()
270 .expect("Failed to acquire read lock on params in start_scheduler");
271 if params.scirs2_params.enabled {
272 drop(params);
273 let scheduler = Arc::new(self.clone());
274 tokio::spawn(async move {
275 scheduler.scirs2_optimization_loop().await;
276 });
277 }
278 Ok(())
279 }
280 pub async fn stop_scheduler(&self) -> DeviceResult<()> {
282 let mut running = self
283 .scheduler_running
284 .lock()
285 .expect("Failed to acquire lock on scheduler_running in stop_scheduler");
286 *running = false;
287 Ok(())
288 }
289 async fn validate_job_config(&self, config: &JobConfig) -> DeviceResult<()> {
290 let backends = self
291 .backends
292 .read()
293 .expect("Failed to acquire read lock on backends in validate_job_config");
294 if backends.is_empty() {
295 return Err(DeviceError::APIError("No backends available".to_string()));
296 }
297 let resource_manager = self
298 .resource_manager
299 .lock()
300 .expect("Failed to acquire lock on resource_manager in validate_job_config");
301 let mut can_satisfy = false;
302 for (backend, capacity) in &resource_manager.available_resources {
303 if capacity.qubits >= config.resource_requirements.min_qubits {
304 if let Some(max_depth) = config.resource_requirements.max_depth {
305 if let Some(backend_max_depth) = capacity.max_circuit_depth {
306 if max_depth > backend_max_depth {
307 continue;
308 }
309 }
310 }
311 can_satisfy = true;
312 break;
313 }
314 }
315 if !can_satisfy {
316 return Err(DeviceError::APIError(
317 "No backend can satisfy resource requirements".to_string(),
318 ));
319 }
320 Ok(())
321 }
322 async fn estimate_execution_time<const N: usize>(
323 &self,
324 circuit: &Circuit<N>,
325 shots: usize,
326 config: &JobConfig,
327 ) -> DeviceResult<Duration> {
328 let analyzer = CircuitAnalyzer::new();
329 let metrics = analyzer
330 .analyze(circuit)
331 .map_err(|e| DeviceError::APIError(format!("Circuit analysis error: {e:?}")))?;
332 let circuit_complexity = (metrics.gate_count as f64).mul_add(0.1, metrics.depth as f64);
333 let shots_factor = (shots as f64).log10();
334 let base_time = Duration::from_secs((circuit_complexity * shots_factor) as u64);
335 let backend_performance = self.backend_performance.read().expect(
336 "Failed to acquire read lock on backend_performance in estimate_execution_time",
337 );
338 let avg_execution_time = if backend_performance.is_empty() {
339 Duration::from_secs(60)
340 } else {
341 let total_time: Duration = backend_performance
342 .values()
343 .map(|p| p.avg_execution_time)
344 .sum();
345 total_time / backend_performance.len() as u32
346 };
347 let estimated = Duration::from_millis(
348 u128::midpoint(base_time.as_millis(), avg_execution_time.as_millis())
349 .try_into()
350 .expect(
351 "Failed to convert estimated execution time to u64 in estimate_execution_time",
352 ),
353 );
354 Ok(estimated)
355 }
356 async fn estimate_cost<const N: usize>(
357 &self,
358 circuit: &Circuit<N>,
359 shots: usize,
360 config: &JobConfig,
361 ) -> DeviceResult<f64> {
362 let analyzer = CircuitAnalyzer::new();
363 let metrics = analyzer
364 .analyze(circuit)
365 .map_err(|e| DeviceError::APIError(format!("Circuit analysis error: {e:?}")))?;
366 let circuit_complexity = metrics.depth as f64 + metrics.gate_count as f64;
367 let base_cost = circuit_complexity * shots as f64 * 0.001;
368 let priority_multiplier = match config.priority {
369 JobPriority::Critical => 3.0,
370 JobPriority::High => 2.0,
371 JobPriority::Normal => 1.0,
372 JobPriority::Low => 0.7,
373 JobPriority::BestEffort => 0.5,
374 };
375 Ok(base_cost * priority_multiplier)
376 }
377 async fn update_user_share(&self, user_id: &str, queued_delta: i32, running_delta: i32) {
378 let mut user_shares = self
379 .user_shares
380 .write()
381 .expect("Failed to acquire write lock on user_shares in update_user_share");
382 let share = user_shares
383 .entry(user_id.to_string())
384 .or_insert_with(|| UserShare {
385 user_id: user_id.to_string(),
386 allocated_share: 1.0,
387 used_share: 0.0,
388 jobs_running: 0,
389 jobs_queued: 0,
390 last_updated: SystemTime::now(),
391 });
392 share.jobs_queued = (share.jobs_queued as i32 + queued_delta).max(0) as usize;
393 share.jobs_running = (share.jobs_running as i32 + running_delta).max(0) as usize;
394 share.last_updated = SystemTime::now();
395 }
396 async fn update_job_status(&self, job_id: &JobId, status: JobStatus) -> DeviceResult<()> {
397 let mut status_map = self.job_status_map.write().map_err(|_| {
398 DeviceError::APIError("Failed to acquire write lock on job_status_map".to_string())
399 })?;
400 status_map.insert(job_id.clone(), status);
401 Ok(())
402 }
403 pub fn job_status(&self, job_id: &JobId) -> Option<JobStatus> {
405 let status_map = self.job_status_map.read().ok()?;
406 status_map.get(job_id).cloned()
407 }
408 pub async fn sort_queues_by_duration(&self) -> DeviceResult<()> {
410 let jobs_snapshot: HashMap<JobId, Option<std::time::Duration>> = {
411 let jobs = self
412 .jobs
413 .read()
414 .map_err(|_| DeviceError::APIError("Lock poisoned".to_string()))?;
415 jobs.keys()
416 .map(|id| (id.clone(), None::<std::time::Duration>))
417 .collect()
418 };
419 let mut queues = self
420 .job_queues
421 .lock()
422 .map_err(|_| DeviceError::APIError("Lock poisoned".to_string()))?;
423 for queue in queues.values_mut() {
424 queue.make_contiguous().sort_by(|a, b| {
425 let da = jobs_snapshot
426 .get(a)
427 .and_then(|d| *d)
428 .unwrap_or(std::time::Duration::MAX);
429 let db = jobs_snapshot
430 .get(b)
431 .and_then(|d| *d)
432 .unwrap_or(std::time::Duration::MAX);
433 da.cmp(&db)
434 });
435 }
436 Ok(())
437 }
438 pub async fn bin_pack_jobs(&self) -> DeviceResult<HashMap<HardwareBackend, Vec<JobId>>> {
442 let resource_manager = self
443 .resource_manager
444 .lock()
445 .map_err(|_| DeviceError::APIError("Lock poisoned".to_string()))?;
446 let queues = self
447 .job_queues
448 .lock()
449 .map_err(|_| DeviceError::APIError("Lock poisoned".to_string()))?;
450 let jobs = self
451 .jobs
452 .read()
453 .map_err(|_| DeviceError::APIError("Lock poisoned".to_string()))?;
454 let mut remaining_slots: HashMap<HardwareBackend, usize> = resource_manager
455 .available_resources
456 .iter()
457 .map(|(&b, cap)| (b, cap.concurrent_jobs))
458 .collect();
459 let mut assignment: HashMap<HardwareBackend, Vec<JobId>> = HashMap::new();
460 for queue in queues.values() {
461 for job_id in queue.iter() {
462 let best_backend = remaining_slots
463 .iter()
464 .filter(|(_, &slots)| slots > 0)
465 .max_by_key(|(_, &slots)| slots)
466 .map(|(&b, _)| b);
467 if let Some(backend) = best_backend {
468 assignment.entry(backend).or_default().push(job_id.clone());
469 if let Some(slots) = remaining_slots.get_mut(&backend) {
470 *slots = slots.saturating_sub(1);
471 }
472 }
473 }
474 }
475 Ok(assignment)
476 }
477 pub async fn route_to_least_loaded_backend(
479 &self,
480 job_id: &JobId,
481 ) -> DeviceResult<Option<HardwareBackend>> {
482 let backend_performance = self
483 .backend_performance
484 .read()
485 .map_err(|_| DeviceError::APIError("Lock poisoned".to_string()))?;
486 let chosen = backend_performance
487 .iter()
488 .filter(|_| true)
489 .min_by(|(_, a), (_, b)| {
490 a.queue_length.cmp(&b.queue_length).then_with(|| {
491 a.utilization
492 .partial_cmp(&b.utilization)
493 .unwrap_or(std::cmp::Ordering::Equal)
494 })
495 })
496 .map(|(&backend, _)| backend);
497 Ok(chosen)
498 }
499 async fn ensure_scheduler_running(&self) {
500 let running = self
501 .scheduler_running
502 .lock()
503 .expect("Failed to acquire lock on scheduler_running in ensure_scheduler_running");
504 if !*running {
505 drop(running);
506 let _ = self.start_scheduler().await;
507 }
508 }
509 async fn predict_queue_times(
510 &self,
511 backend_performance: &HashMap<HardwareBackend, BackendPerformance>,
512 ) -> HashMap<HardwareBackend, Duration> {
513 let mut predictions = HashMap::new();
514 for (backend, perf) in backend_performance {
515 let predicted_time = Duration::from_secs(
516 (perf.queue_length as u64 * perf.avg_execution_time.as_secs())
517 / perf.success_rate.max(0.1) as u64,
518 );
519 predictions.insert(*backend, predicted_time);
520 }
521 predictions
522 }
523 async fn calculate_system_load(
524 &self,
525 backend_performance: &HashMap<HardwareBackend, BackendPerformance>,
526 ) -> f64 {
527 if backend_performance.is_empty() {
528 return 0.0;
529 }
530 let total_utilization: f64 = backend_performance.values().map(|p| p.utilization).sum();
531 total_utilization / backend_performance.len() as f64
532 }
533 async fn calculate_throughput(&self) -> f64 {
534 let history = self
535 .execution_history
536 .read()
537 .expect("Failed to acquire read lock on execution_history in calculate_throughput");
538 if history.is_empty() {
539 return 0.0;
540 }
541 let one_hour_ago = SystemTime::now() - Duration::from_secs(3600);
542 let recent_completions = history
543 .iter()
544 .filter(|exec| exec.started_at > one_hour_ago)
545 .count();
546 recent_completions as f64
547 }
548 async fn calculate_average_wait_time(&self) -> Duration {
549 let history = self.execution_history.read().expect(
550 "Failed to acquire read lock on execution_history in calculate_average_wait_time",
551 );
552 if history.is_empty() {
553 return Duration::from_secs(0);
554 }
555 let total_wait: Duration = history.iter().map(|exec| exec.metrics.queue_time).sum();
556 total_wait / history.len() as u32
557 }
558 async fn scheduling_loop(&self) {
559 while *self
560 .scheduler_running
561 .lock()
562 .expect("Failed to acquire lock on scheduler_running in scheduling_loop")
563 {
564 if let Err(e) = self.schedule_next_jobs().await {
565 eprintln!("Scheduling error: {e}");
566 }
567 tokio::time::sleep(Duration::from_secs(1)).await;
568 }
569 }
570 async fn schedule_next_jobs(&self) -> DeviceResult<()> {
571 let params = self
572 .params
573 .read()
574 .expect("Failed to acquire read lock on params in schedule_next_jobs")
575 .clone();
576 match params.strategy {
577 SchedulingStrategy::PriorityFIFO => self.schedule_priority_fifo().await,
578 SchedulingStrategy::ShortestJobFirst => self.schedule_shortest_job_first().await,
579 SchedulingStrategy::FairShare => self.schedule_fair_share().await,
580 SchedulingStrategy::Backfill => self.schedule_backfill().await,
581 SchedulingStrategy::MLOptimized => self.schedule_ml_optimized().await,
582 _ => self.schedule_priority_fifo().await,
583 }
584 }
585 async fn schedule_priority_fifo(&self) -> DeviceResult<()> {
586 for priority in [
587 JobPriority::Critical,
588 JobPriority::High,
589 JobPriority::Normal,
590 JobPriority::Low,
591 JobPriority::BestEffort,
592 ] {
593 let job_id = {
594 let mut queues = self
595 .job_queues
596 .lock()
597 .expect("Failed to acquire lock on job_queues in schedule_priority_fifo");
598 queues
599 .get_mut(&priority)
600 .and_then(|queue| queue.pop_front())
601 };
602 if let Some(job_id) = job_id {
603 if let Some(backend) = self.find_best_backend(&job_id).await? {
604 self.assign_job_to_backend(&job_id, backend).await?;
605 break;
606 } else {
607 let mut queues = self
608 .job_queues
609 .lock()
610 .expect(
611 "Failed to acquire lock on job_queues to requeue job in schedule_priority_fifo",
612 );
613 if let Some(queue) = queues.get_mut(&priority) {
614 queue.push_front(job_id);
615 }
616 break;
617 }
618 }
619 }
620 Ok(())
621 }
622 async fn schedule_shortest_job_first(&self) -> DeviceResult<()> {
623 self.sort_queues_by_duration().await?;
624 self.schedule_priority_fifo().await
625 }
626 async fn schedule_fair_share(&self) -> DeviceResult<()> {
627 let user_shares_snapshot = {
628 let user_shares = self
629 .user_shares
630 .read()
631 .map_err(|_| DeviceError::APIError("Lock poisoned on user_shares".to_string()))?;
632 user_shares.clone()
633 };
634 let fairness_score = |user_id: &str| -> f64 {
635 user_shares_snapshot.get(user_id).map_or(0.0, |s| {
636 if s.allocated_share < 1e-10 {
637 f64::MAX
638 } else {
639 s.used_share / s.allocated_share
640 }
641 })
642 };
643 let mut candidate: Option<(JobPriority, usize, f64)> = None;
644 {
645 let queues = self
646 .job_queues
647 .lock()
648 .map_err(|_| DeviceError::APIError("Lock poisoned on job_queues".to_string()))?;
649 let jobs = self
650 .jobs
651 .read()
652 .map_err(|_| DeviceError::APIError("Lock poisoned on jobs".to_string()))?;
653 for (&priority, queue) in queues.iter() {
654 if let Some((pos, job_id)) = queue.iter().enumerate().next() {
655 let score = {
656 macro_rules! try_downcast {
657 ($n:expr) => {
658 jobs.get(job_id)
659 .and_then(|b| b.downcast_ref::<super::types::QuantumJob<$n>>())
660 .map(|j| fairness_score(&j.user_id))
661 };
662 }
663 try_downcast!(1)
664 .or_else(|| try_downcast!(2))
665 .or_else(|| try_downcast!(4))
666 .or_else(|| try_downcast!(8))
667 .or_else(|| try_downcast!(16))
668 .or_else(|| try_downcast!(32))
669 .or_else(|| try_downcast!(64))
670 .unwrap_or(0.0)
671 };
672 let better = candidate.as_ref().map_or(true, |&(_, _, s)| score < s);
673 if better {
674 candidate = Some((priority, pos, score));
675 }
676 }
677 }
678 }
679 if let Some((priority, pos, _)) = candidate {
680 let job_id =
681 {
682 let mut queues = self.job_queues.lock().map_err(|_| {
683 DeviceError::APIError("Lock poisoned on job_queues".to_string())
684 })?;
685 queues.get_mut(&priority).and_then(|q| {
686 if pos < q.len() {
687 q.remove(pos)
688 } else {
689 None
690 }
691 })
692 };
693 if let Some(job_id) = job_id {
694 if let Some(backend) = self.find_best_backend(&job_id).await? {
695 self.assign_job_to_backend(&job_id, backend).await?;
696 }
697 }
698 }
699 Ok(())
700 }
701 async fn schedule_backfill(&self) -> DeviceResult<()> {
702 self.sort_queues_by_duration().await?;
703 let assignment = self.bin_pack_jobs().await?;
704 for (backend, job_ids) in assignment {
705 for job_id in job_ids {
706 let removed = {
707 let mut queues = self.job_queues.lock().map_err(|_| {
708 DeviceError::APIError("Lock poisoned on job_queues".to_string())
709 })?;
710 let mut found = false;
711 for queue in queues.values_mut() {
712 if let Some(pos) = queue.iter().position(|id| id == &job_id) {
713 queue.remove(pos);
714 found = true;
715 break;
716 }
717 }
718 found
719 };
720 if removed && self.is_backend_available(backend).await {
721 self.assign_job_to_backend(&job_id, backend).await?;
722 }
723 }
724 }
725 Ok(())
726 }
727 async fn schedule_ml_optimized(&self) -> DeviceResult<()> {
728 #[cfg(feature = "scirs2")]
729 {
730 self.scirs2_optimize_schedule().await
731 }
732 #[cfg(not(feature = "scirs2"))]
733 {
734 self.schedule_priority_fifo().await
735 }
736 }
737 #[cfg(feature = "scirs2")]
738 async fn scirs2_optimize_schedule(&self) -> DeviceResult<()> {
739 self.schedule_priority_fifo().await
740 }
741 async fn find_best_backend(&self, job_id: &JobId) -> DeviceResult<Option<HardwareBackend>> {
742 {
743 let jobs = self
744 .jobs
745 .read()
746 .expect("Failed to acquire read lock on jobs in find_best_backend");
747 let _job_any = jobs
748 .get(job_id)
749 .ok_or_else(|| DeviceError::APIError("Job not found".to_string()))?;
750 }
751 let job_resource_requirements: Option<ResourceRequirements> = {
752 let config_map = self
753 .job_config_map
754 .read()
755 .expect("Failed to acquire read lock on job_config_map in find_best_backend");
756 config_map
757 .get(job_id)
758 .map(|cfg| cfg.resource_requirements.clone())
759 };
760 let backends: Vec<_> = {
761 let backends = self
762 .backends
763 .read()
764 .expect("Failed to acquire read lock on backends in find_best_backend");
765 backends.iter().copied().collect()
766 };
767 let allocation_strategy = {
768 let params = self
769 .params
770 .read()
771 .expect("Failed to acquire read lock on params in find_best_backend");
772 params.allocation_strategy.clone()
773 };
774 let backend_performance_snapshot = {
775 let backend_performance = self
776 .backend_performance
777 .read()
778 .expect("Failed to acquire read lock on backend_performance in find_best_backend");
779 backend_performance.clone()
780 };
781 match allocation_strategy {
782 AllocationStrategy::FirstFit => {
783 for backend in backends {
784 if self.is_backend_available(backend).await {
785 return Ok(Some(backend));
786 }
787 }
788 }
789 AllocationStrategy::BestFit => {
790 let (capacity_snapshot, required_qubits) = {
791 let resource_manager = self.resource_manager.lock().expect(
792 "Failed to acquire lock on resource_manager in find_best_backend BestFit",
793 );
794 let snapshot: HashMap<HardwareBackend, usize> = resource_manager
795 .available_resources
796 .iter()
797 .map(|(&b, cap)| (b, cap.qubits))
798 .collect();
799 let req = job_resource_requirements
800 .as_ref()
801 .map_or(1, |r| r.min_qubits);
802 (snapshot, req)
803 };
804 let mut best_backend: Option<HardwareBackend> = None;
805 let mut best_excess = usize::MAX;
806 for &backend in &backends {
807 if !self.is_backend_available(backend).await {
808 continue;
809 }
810 if let Some(&cap_qubits) = capacity_snapshot.get(&backend) {
811 if cap_qubits >= required_qubits {
812 let excess = cap_qubits - required_qubits;
813 if excess < best_excess {
814 best_excess = excess;
815 best_backend = Some(backend);
816 }
817 }
818 }
819 }
820 return Ok(best_backend);
821 }
822 AllocationStrategy::LeastLoaded => {
823 let mut best_backend = None;
824 let mut lowest_utilization = f64::INFINITY;
825 for (&backend, perf) in &backend_performance_snapshot {
826 if self.is_backend_available(backend).await
827 && perf.utilization < lowest_utilization
828 {
829 lowest_utilization = perf.utilization;
830 best_backend = Some(backend);
831 }
832 }
833 return Ok(best_backend);
834 }
835 _ => {
836 for &backend in &backends {
837 if self.is_backend_available(backend).await {
838 return Ok(Some(backend));
839 }
840 }
841 }
842 }
843 Ok(None)
844 }
845 async fn is_backend_available(&self, backend: HardwareBackend) -> bool {
846 let available = {
847 let running_jobs = self
848 .running_jobs
849 .read()
850 .expect("Failed to acquire read lock on running_jobs in is_backend_available");
851 let backend_jobs = running_jobs.values().filter(|(b, _)| *b == backend).count();
852 drop(running_jobs);
853 let resource_manager = self
854 .resource_manager
855 .lock()
856 .expect("Failed to acquire lock on resource_manager in is_backend_available");
857 let result = resource_manager
858 .available_resources
859 .get(&backend)
860 .is_some_and(|capacity| backend_jobs < capacity.concurrent_jobs);
861 drop(resource_manager);
862 result
863 };
864 available
865 }
866 async fn assign_job_to_backend(
867 &self,
868 job_id: &JobId,
869 backend: HardwareBackend,
870 ) -> DeviceResult<()> {
871 {
872 let mut running_jobs = self
873 .running_jobs
874 .write()
875 .expect("Failed to acquire write lock on running_jobs in assign_job_to_backend");
876 running_jobs.insert(job_id.clone(), (backend, SystemTime::now()));
877 }
878 self.update_job_status(job_id, JobStatus::Scheduled).await?;
879 let _ = self
880 .event_sender
881 .send(SchedulerEvent::JobScheduled(job_id.clone(), backend));
882 let job_id_clone = job_id.clone();
883 let scheduler = Arc::new(self.clone());
884 tokio::spawn(async move {
885 let _ = scheduler.execute_job(&job_id_clone, backend).await;
886 });
887 Ok(())
888 }
889 async fn execute_job(&self, job_id: &JobId, backend: HardwareBackend) -> DeviceResult<()> {
890 self.update_job_status(job_id, JobStatus::Running).await?;
891 let _ = self
892 .event_sender
893 .send(SchedulerEvent::JobStarted(job_id.clone()));
894 let execution_start = SystemTime::now();
895 {
896 let backends = self
897 .backends
898 .read()
899 .expect("Failed to acquire read lock on backends in execute_job");
900 if !backends.contains(&backend) {
901 return Err(DeviceError::APIError("Backend not found".to_string()));
902 }
903 }
904 let job_config = {
905 let config_map = self
906 .job_config_map
907 .read()
908 .expect("Failed to acquire read lock on job_config_map in execute_job");
909 config_map.get(job_id).cloned()
910 };
911 let queue_time = {
912 let jobs = self
913 .jobs
914 .read()
915 .expect("Failed to acquire read lock on jobs in execute_job queue_time");
916 macro_rules! try_submitted_at {
917 ($n:expr) => {
918 jobs.get(job_id)
919 .and_then(|b| b.downcast_ref::<super::types::QuantumJob<$n>>())
920 .map(|j| {
921 execution_start
922 .duration_since(j.submitted_at)
923 .unwrap_or(Duration::from_secs(0))
924 })
925 };
926 }
927 try_submitted_at!(1)
928 .or_else(|| try_submitted_at!(2))
929 .or_else(|| try_submitted_at!(4))
930 .or_else(|| try_submitted_at!(8))
931 .or_else(|| try_submitted_at!(16))
932 .or_else(|| try_submitted_at!(32))
933 .or_else(|| try_submitted_at!(64))
934 .or_else(|| try_submitted_at!(128))
935 .unwrap_or(Duration::from_secs(0))
936 };
937 {
938 let status_map = self.job_status_map.read().expect(
939 "Failed to acquire read lock on job_status_map in execute_job cancellation check",
940 );
941 if status_map.get(job_id) == Some(&JobStatus::Cancelled) {
942 return Ok(());
943 }
944 }
945 let simulated_execution_time = job_config
946 .as_ref()
947 .map(|_cfg| Duration::from_secs(1))
948 .unwrap_or(Duration::from_secs(1));
949 tokio::time::sleep(simulated_execution_time).await;
950 {
951 let mut running_jobs = self
952 .running_jobs
953 .write()
954 .expect("Failed to acquire write lock on running_jobs in execute_job cleanup");
955 running_jobs.remove(job_id);
956 }
957 self.update_job_status(job_id, JobStatus::Completed).await?;
958 let execution_end = SystemTime::now();
959 let execution_time = execution_end
960 .duration_since(execution_start)
961 .unwrap_or(Duration::from_secs(0));
962 let metrics = ExecutionMetrics {
963 queue_time,
964 execution_time: Some(execution_time),
965 resource_utilization: 1.0,
966 cost: job_config.as_ref().and_then(|c| c.cost_limit),
967 quality_metrics: {
968 let mut m = HashMap::new();
969 m.insert(
970 "execution_time_secs".to_string(),
971 execution_time.as_secs_f64(),
972 );
973 m.insert("queue_time_secs".to_string(), queue_time.as_secs_f64());
974 m
975 },
976 };
977 {
978 let mut metrics_map = self
979 .job_metrics_map
980 .write()
981 .expect("Failed to acquire write lock on job_metrics_map in execute_job");
982 metrics_map.insert(job_id.clone(), metrics.clone());
983 }
984 {
985 let mut perf_map = self
986 .backend_performance
987 .write()
988 .expect("Failed to acquire write lock on backend_performance in execute_job");
989 if let Some(perf) = perf_map.get_mut(&backend) {
990 let alpha = 0.2_f64;
991 let prev_exec_secs = perf.avg_execution_time.as_secs_f64();
992 let new_exec_secs =
993 (1.0 - alpha) * prev_exec_secs + alpha * execution_time.as_secs_f64();
994 perf.avg_execution_time = Duration::from_secs_f64(new_exec_secs.max(0.0));
995 let prev_queue_secs = perf.avg_queue_time.as_secs_f64();
996 let new_queue_secs =
997 (1.0 - alpha) * prev_queue_secs + alpha * queue_time.as_secs_f64();
998 perf.avg_queue_time = Duration::from_secs_f64(new_queue_secs.max(0.0));
999 perf.success_rate = (1.0 - alpha) * perf.success_rate + alpha * 1.0;
1000 perf.last_updated = execution_end;
1001 }
1002 }
1003 {
1004 let mut history = self
1005 .execution_history
1006 .write()
1007 .expect("Failed to acquire write lock on execution_history in execute_job");
1008 history.push(JobExecution {
1009 attempt: 1,
1010 backend,
1011 started_at: execution_start,
1012 ended_at: Some(execution_end),
1013 result: None,
1014 error: None,
1015 metrics,
1016 });
1017 }
1018 Ok(())
1019 }
1020 async fn performance_monitoring_loop(&self) {
1021 while *self
1022 .scheduler_running
1023 .lock()
1024 .expect("Failed to acquire lock on scheduler_running in performance_monitoring_loop")
1025 {
1026 self.update_backend_performance().await;
1027 tokio::time::sleep(Duration::from_secs(30)).await;
1028 }
1029 }
1030 async fn update_backend_performance(&self) {
1031 let mut backend_performance = self.backend_performance.write().expect(
1032 "Failed to acquire write lock on backend_performance in update_backend_performance",
1033 );
1034 let now = SystemTime::now();
1035 for (backend, perf) in backend_performance.iter_mut() {
1036 perf.last_updated = now;
1037 let snapshot = PerformanceSnapshot {
1038 timestamp: now,
1039 queue_length: perf.queue_length,
1040 utilization: perf.utilization,
1041 avg_queue_time_secs: perf.avg_queue_time.as_secs_f64(),
1042 success_rate: perf.success_rate,
1043 };
1044 perf.history.push_back(snapshot);
1045 let cutoff = now - Duration::from_secs(86400);
1046 while let Some(front) = perf.history.front() {
1047 if front.timestamp < cutoff {
1048 perf.history.pop_front();
1049 } else {
1050 break;
1051 }
1052 }
1053 }
1054 }
1055 async fn scirs2_optimization_loop(&self) {
1056 let frequency = {
1057 let params = self
1058 .params
1059 .read()
1060 .expect("Failed to acquire read lock on params in scirs2_optimization_loop");
1061 params.scirs2_params.optimization_frequency
1062 };
1063 loop {
1064 let should_continue = *self
1065 .scheduler_running
1066 .lock()
1067 .expect("Failed to acquire lock on scheduler_running in scirs2_optimization_loop");
1068 if !should_continue {
1069 break;
1070 }
1071 if let Err(e) = self.run_scirs2_optimization().await {
1072 eprintln!("SciRS2 optimization error: {e}");
1073 }
1074 tokio::time::sleep(frequency).await;
1075 }
1076 }
1077 async fn run_scirs2_optimization(&self) -> DeviceResult<()> {
1078 #[cfg(feature = "scirs2")]
1079 {
1080 let backend_snapshot: Vec<(HardwareBackend, f64)> = {
1081 let bp = self.backend_performance.read().expect(
1082 "Failed to acquire read lock on backend_performance in run_scirs2_optimization",
1083 );
1084 bp.iter().map(|(&b, p)| (b, p.utilization)).collect()
1085 };
1086 let performance_data: Vec<f64> = backend_snapshot.iter().map(|(_, u)| *u).collect();
1087 if performance_data.len() > 1 {
1088 use scirs2_core::ndarray::Array1;
1089 let data_array = Array1::from_vec(performance_data);
1090 let avg_utilization: f64 = mean(&data_array.view()).unwrap_or(0.5);
1091 let utilization_std: f64 = stats_std(&data_array.view(), 1, None).unwrap_or(0.1);
1092 let overload_threshold = avg_utilization + utilization_std;
1093 let underload_threshold = (avg_utilization - utilization_std).max(0.0);
1094 let overloaded: Vec<HardwareBackend> = backend_snapshot
1095 .iter()
1096 .filter(|(_, u)| *u > overload_threshold)
1097 .map(|(b, _)| *b)
1098 .collect();
1099 let underloaded: Vec<(HardwareBackend, f64)> = backend_snapshot
1100 .iter()
1101 .filter(|(_, u)| *u < underload_threshold)
1102 .copied()
1103 .collect();
1104 if underloaded.is_empty() {
1105 return Ok(());
1106 }
1107 for overloaded_backend in overloaded {
1108 let target_backend = underloaded
1109 .iter()
1110 .min_by(|(_, ua), (_, ub)| {
1111 ua.partial_cmp(ub).unwrap_or(std::cmp::Ordering::Equal)
1112 })
1113 .map(|(b, _)| *b)
1114 .unwrap_or(underloaded[0].0);
1115 let candidate_job_id: Option<JobId> = {
1116 let queues = self
1117 .job_queues
1118 .lock()
1119 .expect("Failed to acquire lock on job_queues in load balancing");
1120 let config_map = self
1121 .job_config_map
1122 .read()
1123 .expect("Failed to read job_config_map in load balancing");
1124 let mut found = None;
1125 'outer: for queue in queues.values() {
1126 for job_id in queue.iter() {
1127 if let Some(cfg) = config_map.get(job_id) {
1128 if cfg.preferred_backends.first() == Some(&overloaded_backend) {
1129 found = Some(job_id.clone());
1130 break 'outer;
1131 }
1132 }
1133 }
1134 }
1135 found
1136 };
1137 if let Some(job_id) = candidate_job_id {
1138 let mut config_map = self
1139 .job_config_map
1140 .write()
1141 .expect("Failed to write job_config_map in load balancing");
1142 if let Some(cfg) = config_map.get_mut(&job_id) {
1143 cfg.preferred_backends.insert(0, target_backend);
1144 }
1145 }
1146 }
1147 }
1148 }
1149 Ok(())
1150 }
1151}