Skip to main content

quantrs2_device/job_scheduling/
impls_extended.rs

1//! Auto-generated module
2//!
3//! 🤖 Generated with [SplitRS](https://github.com/cool-japan/splitrs)
4
5use crate::{
6    backend_traits::query_backend_capabilities, translation::HardwareBackend, CircuitResult,
7    DeviceError, DeviceResult,
8};
9use quantrs2_circuit::{optimization::analysis::CircuitAnalyzer, prelude::Circuit};
10use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
11use std::sync::{Arc, Mutex, RwLock};
12use std::time::{Duration, SystemTime};
13use tokio::sync::mpsc;
14// SciRS2 dependencies for optimization algorithms
15#[cfg(not(feature = "scirs2"))]
16use super::fallback_scirs2::std as stats_std;
17#[cfg(not(feature = "scirs2"))]
18use super::fallback_scirs2::{mean, minimize, OptimizeResult};
19use super::types::*;
20#[cfg(feature = "scirs2")]
21use scirs2_stats::{mean, std as stats_std};
22
23impl QuantumJobScheduler {
24    /// Create a new quantum job scheduler
25    pub fn new(params: SchedulingParams) -> Self {
26        let (event_sender, _) = mpsc::unbounded_channel();
27        Self {
28            params: Arc::new(RwLock::new(params)),
29            job_queues: Arc::new(Mutex::new(BTreeMap::new())),
30            jobs: Arc::new(RwLock::new(HashMap::new())),
31            backend_performance: Arc::new(RwLock::new(HashMap::new())),
32            backends: Arc::new(RwLock::new(HashSet::new())),
33            running_jobs: Arc::new(RwLock::new(HashMap::new())),
34            execution_history: Arc::new(RwLock::new(Vec::new())),
35            user_shares: Arc::new(RwLock::new(HashMap::new())),
36            scheduler_running: Arc::new(Mutex::new(false)),
37            event_sender,
38            performance_predictor: Arc::new(Mutex::new(PerformancePredictor::new())),
39            resource_manager: Arc::new(Mutex::new(ResourceManager::new())),
40            job_status_map: Arc::new(RwLock::new(HashMap::new())),
41            job_config_map: Arc::new(RwLock::new(HashMap::new())),
42            job_metrics_map: Arc::new(RwLock::new(HashMap::new())),
43        }
44    }
45    /// Register a backend
46    pub async fn register_backend(&self, backend: HardwareBackend) -> DeviceResult<()> {
47        let mut backends = self
48            .backends
49            .write()
50            .expect("Failed to acquire write lock on backends in register_backend");
51        backends.insert(backend);
52        let mut performance = self
53            .backend_performance
54            .write()
55            .expect("Failed to acquire write lock on backend_performance in register_backend");
56        performance.insert(
57            backend,
58            BackendPerformance {
59                backend,
60                queue_length: 0,
61                avg_queue_time: Duration::from_secs(0),
62                avg_execution_time: Duration::from_secs(0),
63                success_rate: 1.0,
64                utilization: 0.0,
65                avg_cost: None,
66                last_updated: SystemTime::now(),
67                history: VecDeque::new(),
68            },
69        );
70        let capabilities = query_backend_capabilities(backend);
71        let mut resource_manager = self
72            .resource_manager
73            .lock()
74            .expect("Failed to acquire lock on resource_manager in register_backend");
75        resource_manager.available_resources.insert(
76            backend,
77            ResourceCapacity {
78                qubits: capabilities.features.max_qubits,
79                max_circuit_depth: capabilities.features.max_depth,
80                memory_mb: 8192,
81                cpu_cores: 4,
82                concurrent_jobs: 10,
83                features: capabilities
84                    .features
85                    .supported_measurement_bases
86                    .into_iter()
87                    .collect(),
88            },
89        );
90        Ok(())
91    }
92    /// Get list of available backends
93    pub fn get_available_backends(&self) -> Vec<HardwareBackend> {
94        let backends = self
95            .backends
96            .read()
97            .expect("Failed to acquire read lock on backends in get_available_backends");
98        backends.iter().copied().collect()
99    }
100    /// Submit a quantum job for execution
101    pub async fn submit_job<const N: usize>(
102        &self,
103        circuit: Circuit<N>,
104        shots: usize,
105        config: JobConfig,
106        user_id: String,
107    ) -> DeviceResult<JobId> {
108        let job_id = JobId::new();
109        let now = SystemTime::now();
110        self.validate_job_config(&config).await?;
111        let estimated_duration = self
112            .estimate_execution_time(&circuit, shots, &config)
113            .await?;
114        let estimated_cost = self.estimate_cost(&circuit, shots, &config).await?;
115        let job = QuantumJob {
116            id: job_id.clone(),
117            config,
118            circuit,
119            shots,
120            submitted_at: now,
121            status: JobStatus::Pending,
122            execution_history: vec![],
123            metadata: HashMap::new(),
124            user_id: user_id.clone(),
125            group_id: None,
126            estimated_duration: Some(estimated_duration),
127            assigned_backend: None,
128            estimated_cost: Some(estimated_cost),
129            actual_cost: None,
130        };
131        let mut jobs = self
132            .jobs
133            .write()
134            .expect("Failed to acquire write lock on jobs in submit_job");
135        jobs.insert(job_id.clone(), Box::new(job.clone()));
136        drop(jobs);
137        {
138            let mut config_map = self
139                .job_config_map
140                .write()
141                .expect("Failed to acquire write lock on job_config_map in submit_job");
142            config_map.insert(job_id.clone(), job.config.clone());
143        }
144        let mut queues = self
145            .job_queues
146            .lock()
147            .expect("Failed to acquire lock on job_queues in submit_job");
148        let queue = queues.entry(job.config.priority).or_default();
149        queue.push_back(job_id.clone());
150        drop(queues);
151        self.update_user_share(&user_id, 1, 0).await;
152        let _ = self
153            .event_sender
154            .send(SchedulerEvent::JobSubmitted(job_id.clone()));
155        self.ensure_scheduler_running().await;
156        Ok(job_id)
157    }
158    /// Cancel a queued or running job
159    pub async fn cancel_job(&self, job_id: &JobId) -> DeviceResult<bool> {
160        let mut queues = self
161            .job_queues
162            .lock()
163            .expect("Failed to acquire lock on job_queues in cancel_job");
164        for queue in queues.values_mut() {
165            if let Some(pos) = queue.iter().position(|id| id == job_id) {
166                queue.remove(pos);
167                drop(queues);
168                self.update_job_status(job_id, JobStatus::Cancelled).await?;
169                let _ = self
170                    .event_sender
171                    .send(SchedulerEvent::JobCancelled(job_id.clone()));
172                return Ok(true);
173            }
174        }
175        drop(queues);
176        let is_running = {
177            let running_jobs = self.running_jobs.read().map_err(|_| {
178                DeviceError::APIError("Lock poisoned on running_jobs in cancel_job".to_string())
179            })?;
180            running_jobs.contains_key(job_id)
181        };
182        if is_running {
183            self.update_job_status(job_id, JobStatus::Cancelled).await?;
184            let mut running_jobs = self.running_jobs.write().map_err(|_| {
185                DeviceError::APIError(
186                    "Lock poisoned on running_jobs write in cancel_job".to_string(),
187                )
188            })?;
189            running_jobs.remove(job_id);
190            let _ = self
191                .event_sender
192                .send(SchedulerEvent::JobCancelled(job_id.clone()));
193            return Ok(true);
194        }
195        Ok(false)
196    }
197    /// Get job status and information
198    pub async fn get_job_status<const N: usize>(
199        &self,
200        job_id: &JobId,
201    ) -> DeviceResult<Option<QuantumJob<N>>> {
202        let jobs = self
203            .jobs
204            .read()
205            .expect("Failed to acquire read lock on jobs in get_job_status");
206        if let Some(job_any) = jobs.get(job_id) {
207            if let Some(job) = job_any.downcast_ref::<QuantumJob<N>>() {
208                return Ok(Some(job.clone()));
209            }
210        }
211        Ok(None)
212    }
213    /// Get queue analytics and predictions
214    pub async fn get_queue_analytics(&self) -> DeviceResult<QueueAnalytics> {
215        let queues = self
216            .job_queues
217            .lock()
218            .expect("Failed to acquire lock on job_queues in get_queue_analytics");
219        let backend_performance = self
220            .backend_performance
221            .read()
222            .expect("Failed to acquire read lock on backend_performance in get_queue_analytics");
223        let total_queue_length = queues.values().map(|q| q.len()).sum();
224        let queue_by_priority = queues
225            .iter()
226            .map(|(priority, queue)| (*priority, queue.len()))
227            .collect();
228        let queue_by_backend = backend_performance
229            .iter()
230            .map(|(backend, perf)| (*backend, perf.queue_length))
231            .collect();
232        let predicted_queue_times = self.predict_queue_times(&backend_performance).await;
233        let system_load = self.calculate_system_load(&backend_performance).await;
234        let throughput = self.calculate_throughput().await;
235        let avg_wait_time = self.calculate_average_wait_time().await;
236        Ok(QueueAnalytics {
237            total_queue_length,
238            queue_by_priority,
239            queue_by_backend,
240            predicted_queue_times,
241            system_load,
242            throughput,
243            avg_wait_time,
244        })
245    }
246    /// Start the job scheduler
247    pub async fn start_scheduler(&self) -> DeviceResult<()> {
248        let mut running = self
249            .scheduler_running
250            .lock()
251            .expect("Failed to acquire lock on scheduler_running in start_scheduler");
252        if *running {
253            return Err(DeviceError::APIError(
254                "Scheduler already running".to_string(),
255            ));
256        }
257        *running = true;
258        drop(running);
259        let scheduler = Arc::new(self.clone());
260        tokio::spawn(async move {
261            scheduler.scheduling_loop().await;
262        });
263        let scheduler = Arc::new(self.clone());
264        tokio::spawn(async move {
265            scheduler.performance_monitoring_loop().await;
266        });
267        let params = self
268            .params
269            .read()
270            .expect("Failed to acquire read lock on params in start_scheduler");
271        if params.scirs2_params.enabled {
272            drop(params);
273            let scheduler = Arc::new(self.clone());
274            tokio::spawn(async move {
275                scheduler.scirs2_optimization_loop().await;
276            });
277        }
278        Ok(())
279    }
280    /// Stop the job scheduler
281    pub async fn stop_scheduler(&self) -> DeviceResult<()> {
282        let mut running = self
283            .scheduler_running
284            .lock()
285            .expect("Failed to acquire lock on scheduler_running in stop_scheduler");
286        *running = false;
287        Ok(())
288    }
289    async fn validate_job_config(&self, config: &JobConfig) -> DeviceResult<()> {
290        let backends = self
291            .backends
292            .read()
293            .expect("Failed to acquire read lock on backends in validate_job_config");
294        if backends.is_empty() {
295            return Err(DeviceError::APIError("No backends available".to_string()));
296        }
297        let resource_manager = self
298            .resource_manager
299            .lock()
300            .expect("Failed to acquire lock on resource_manager in validate_job_config");
301        let mut can_satisfy = false;
302        for (backend, capacity) in &resource_manager.available_resources {
303            if capacity.qubits >= config.resource_requirements.min_qubits {
304                if let Some(max_depth) = config.resource_requirements.max_depth {
305                    if let Some(backend_max_depth) = capacity.max_circuit_depth {
306                        if max_depth > backend_max_depth {
307                            continue;
308                        }
309                    }
310                }
311                can_satisfy = true;
312                break;
313            }
314        }
315        if !can_satisfy {
316            return Err(DeviceError::APIError(
317                "No backend can satisfy resource requirements".to_string(),
318            ));
319        }
320        Ok(())
321    }
322    async fn estimate_execution_time<const N: usize>(
323        &self,
324        circuit: &Circuit<N>,
325        shots: usize,
326        config: &JobConfig,
327    ) -> DeviceResult<Duration> {
328        let analyzer = CircuitAnalyzer::new();
329        let metrics = analyzer
330            .analyze(circuit)
331            .map_err(|e| DeviceError::APIError(format!("Circuit analysis error: {e:?}")))?;
332        let circuit_complexity = (metrics.gate_count as f64).mul_add(0.1, metrics.depth as f64);
333        let shots_factor = (shots as f64).log10();
334        let base_time = Duration::from_secs((circuit_complexity * shots_factor) as u64);
335        let backend_performance = self.backend_performance.read().expect(
336            "Failed to acquire read lock on backend_performance in estimate_execution_time",
337        );
338        let avg_execution_time = if backend_performance.is_empty() {
339            Duration::from_secs(60)
340        } else {
341            let total_time: Duration = backend_performance
342                .values()
343                .map(|p| p.avg_execution_time)
344                .sum();
345            total_time / backend_performance.len() as u32
346        };
347        let estimated = Duration::from_millis(
348            u128::midpoint(base_time.as_millis(), avg_execution_time.as_millis())
349                .try_into()
350                .expect(
351                    "Failed to convert estimated execution time to u64 in estimate_execution_time",
352                ),
353        );
354        Ok(estimated)
355    }
356    async fn estimate_cost<const N: usize>(
357        &self,
358        circuit: &Circuit<N>,
359        shots: usize,
360        config: &JobConfig,
361    ) -> DeviceResult<f64> {
362        let analyzer = CircuitAnalyzer::new();
363        let metrics = analyzer
364            .analyze(circuit)
365            .map_err(|e| DeviceError::APIError(format!("Circuit analysis error: {e:?}")))?;
366        let circuit_complexity = metrics.depth as f64 + metrics.gate_count as f64;
367        let base_cost = circuit_complexity * shots as f64 * 0.001;
368        let priority_multiplier = match config.priority {
369            JobPriority::Critical => 3.0,
370            JobPriority::High => 2.0,
371            JobPriority::Normal => 1.0,
372            JobPriority::Low => 0.7,
373            JobPriority::BestEffort => 0.5,
374        };
375        Ok(base_cost * priority_multiplier)
376    }
377    async fn update_user_share(&self, user_id: &str, queued_delta: i32, running_delta: i32) {
378        let mut user_shares = self
379            .user_shares
380            .write()
381            .expect("Failed to acquire write lock on user_shares in update_user_share");
382        let share = user_shares
383            .entry(user_id.to_string())
384            .or_insert_with(|| UserShare {
385                user_id: user_id.to_string(),
386                allocated_share: 1.0,
387                used_share: 0.0,
388                jobs_running: 0,
389                jobs_queued: 0,
390                last_updated: SystemTime::now(),
391            });
392        share.jobs_queued = (share.jobs_queued as i32 + queued_delta).max(0) as usize;
393        share.jobs_running = (share.jobs_running as i32 + running_delta).max(0) as usize;
394        share.last_updated = SystemTime::now();
395    }
396    async fn update_job_status(&self, job_id: &JobId, status: JobStatus) -> DeviceResult<()> {
397        let mut status_map = self.job_status_map.write().map_err(|_| {
398            DeviceError::APIError("Failed to acquire write lock on job_status_map".to_string())
399        })?;
400        status_map.insert(job_id.clone(), status);
401        Ok(())
402    }
403    /// Query the current status of a job without needing the circuit type parameter.
404    pub fn job_status(&self, job_id: &JobId) -> Option<JobStatus> {
405        let status_map = self.job_status_map.read().ok()?;
406        status_map.get(job_id).cloned()
407    }
408    /// Sort all pending jobs in all priority queues by estimated duration (shortest first)
409    pub async fn sort_queues_by_duration(&self) -> DeviceResult<()> {
410        let jobs_snapshot: HashMap<JobId, Option<std::time::Duration>> = {
411            let jobs = self
412                .jobs
413                .read()
414                .map_err(|_| DeviceError::APIError("Lock poisoned".to_string()))?;
415            jobs.keys()
416                .map(|id| (id.clone(), None::<std::time::Duration>))
417                .collect()
418        };
419        let mut queues = self
420            .job_queues
421            .lock()
422            .map_err(|_| DeviceError::APIError("Lock poisoned".to_string()))?;
423        for queue in queues.values_mut() {
424            queue.make_contiguous().sort_by(|a, b| {
425                let da = jobs_snapshot
426                    .get(a)
427                    .and_then(|d| *d)
428                    .unwrap_or(std::time::Duration::MAX);
429                let db = jobs_snapshot
430                    .get(b)
431                    .and_then(|d| *d)
432                    .unwrap_or(std::time::Duration::MAX);
433                da.cmp(&db)
434            });
435        }
436        Ok(())
437    }
438    /// Bin-pack jobs into backend slots by resource requirements.
439    ///
440    /// Returns a mapping from backend → list of job IDs assigned to it.
441    pub async fn bin_pack_jobs(&self) -> DeviceResult<HashMap<HardwareBackend, Vec<JobId>>> {
442        let resource_manager = self
443            .resource_manager
444            .lock()
445            .map_err(|_| DeviceError::APIError("Lock poisoned".to_string()))?;
446        let queues = self
447            .job_queues
448            .lock()
449            .map_err(|_| DeviceError::APIError("Lock poisoned".to_string()))?;
450        let jobs = self
451            .jobs
452            .read()
453            .map_err(|_| DeviceError::APIError("Lock poisoned".to_string()))?;
454        let mut remaining_slots: HashMap<HardwareBackend, usize> = resource_manager
455            .available_resources
456            .iter()
457            .map(|(&b, cap)| (b, cap.concurrent_jobs))
458            .collect();
459        let mut assignment: HashMap<HardwareBackend, Vec<JobId>> = HashMap::new();
460        for queue in queues.values() {
461            for job_id in queue.iter() {
462                let best_backend = remaining_slots
463                    .iter()
464                    .filter(|(_, &slots)| slots > 0)
465                    .max_by_key(|(_, &slots)| slots)
466                    .map(|(&b, _)| b);
467                if let Some(backend) = best_backend {
468                    assignment.entry(backend).or_default().push(job_id.clone());
469                    if let Some(slots) = remaining_slots.get_mut(&backend) {
470                        *slots = slots.saturating_sub(1);
471                    }
472                }
473            }
474        }
475        Ok(assignment)
476    }
477    /// Route a single job to the backend with the lowest current load (queue_length).
478    pub async fn route_to_least_loaded_backend(
479        &self,
480        job_id: &JobId,
481    ) -> DeviceResult<Option<HardwareBackend>> {
482        let backend_performance = self
483            .backend_performance
484            .read()
485            .map_err(|_| DeviceError::APIError("Lock poisoned".to_string()))?;
486        let chosen = backend_performance
487            .iter()
488            .filter(|_| true)
489            .min_by(|(_, a), (_, b)| {
490                a.queue_length.cmp(&b.queue_length).then_with(|| {
491                    a.utilization
492                        .partial_cmp(&b.utilization)
493                        .unwrap_or(std::cmp::Ordering::Equal)
494                })
495            })
496            .map(|(&backend, _)| backend);
497        Ok(chosen)
498    }
499    async fn ensure_scheduler_running(&self) {
500        let running = self
501            .scheduler_running
502            .lock()
503            .expect("Failed to acquire lock on scheduler_running in ensure_scheduler_running");
504        if !*running {
505            drop(running);
506            let _ = self.start_scheduler().await;
507        }
508    }
509    async fn predict_queue_times(
510        &self,
511        backend_performance: &HashMap<HardwareBackend, BackendPerformance>,
512    ) -> HashMap<HardwareBackend, Duration> {
513        let mut predictions = HashMap::new();
514        for (backend, perf) in backend_performance {
515            let predicted_time = Duration::from_secs(
516                (perf.queue_length as u64 * perf.avg_execution_time.as_secs())
517                    / perf.success_rate.max(0.1) as u64,
518            );
519            predictions.insert(*backend, predicted_time);
520        }
521        predictions
522    }
523    async fn calculate_system_load(
524        &self,
525        backend_performance: &HashMap<HardwareBackend, BackendPerformance>,
526    ) -> f64 {
527        if backend_performance.is_empty() {
528            return 0.0;
529        }
530        let total_utilization: f64 = backend_performance.values().map(|p| p.utilization).sum();
531        total_utilization / backend_performance.len() as f64
532    }
533    async fn calculate_throughput(&self) -> f64 {
534        let history = self
535            .execution_history
536            .read()
537            .expect("Failed to acquire read lock on execution_history in calculate_throughput");
538        if history.is_empty() {
539            return 0.0;
540        }
541        let one_hour_ago = SystemTime::now() - Duration::from_secs(3600);
542        let recent_completions = history
543            .iter()
544            .filter(|exec| exec.started_at > one_hour_ago)
545            .count();
546        recent_completions as f64
547    }
548    async fn calculate_average_wait_time(&self) -> Duration {
549        let history = self.execution_history.read().expect(
550            "Failed to acquire read lock on execution_history in calculate_average_wait_time",
551        );
552        if history.is_empty() {
553            return Duration::from_secs(0);
554        }
555        let total_wait: Duration = history.iter().map(|exec| exec.metrics.queue_time).sum();
556        total_wait / history.len() as u32
557    }
558    async fn scheduling_loop(&self) {
559        while *self
560            .scheduler_running
561            .lock()
562            .expect("Failed to acquire lock on scheduler_running in scheduling_loop")
563        {
564            if let Err(e) = self.schedule_next_jobs().await {
565                eprintln!("Scheduling error: {e}");
566            }
567            tokio::time::sleep(Duration::from_secs(1)).await;
568        }
569    }
570    async fn schedule_next_jobs(&self) -> DeviceResult<()> {
571        let params = self
572            .params
573            .read()
574            .expect("Failed to acquire read lock on params in schedule_next_jobs")
575            .clone();
576        match params.strategy {
577            SchedulingStrategy::PriorityFIFO => self.schedule_priority_fifo().await,
578            SchedulingStrategy::ShortestJobFirst => self.schedule_shortest_job_first().await,
579            SchedulingStrategy::FairShare => self.schedule_fair_share().await,
580            SchedulingStrategy::Backfill => self.schedule_backfill().await,
581            SchedulingStrategy::MLOptimized => self.schedule_ml_optimized().await,
582            _ => self.schedule_priority_fifo().await,
583        }
584    }
585    async fn schedule_priority_fifo(&self) -> DeviceResult<()> {
586        for priority in [
587            JobPriority::Critical,
588            JobPriority::High,
589            JobPriority::Normal,
590            JobPriority::Low,
591            JobPriority::BestEffort,
592        ] {
593            let job_id = {
594                let mut queues = self
595                    .job_queues
596                    .lock()
597                    .expect("Failed to acquire lock on job_queues in schedule_priority_fifo");
598                queues
599                    .get_mut(&priority)
600                    .and_then(|queue| queue.pop_front())
601            };
602            if let Some(job_id) = job_id {
603                if let Some(backend) = self.find_best_backend(&job_id).await? {
604                    self.assign_job_to_backend(&job_id, backend).await?;
605                    break;
606                } else {
607                    let mut queues = self
608                        .job_queues
609                        .lock()
610                        .expect(
611                            "Failed to acquire lock on job_queues to requeue job in schedule_priority_fifo",
612                        );
613                    if let Some(queue) = queues.get_mut(&priority) {
614                        queue.push_front(job_id);
615                    }
616                    break;
617                }
618            }
619        }
620        Ok(())
621    }
622    async fn schedule_shortest_job_first(&self) -> DeviceResult<()> {
623        self.sort_queues_by_duration().await?;
624        self.schedule_priority_fifo().await
625    }
626    async fn schedule_fair_share(&self) -> DeviceResult<()> {
627        let user_shares_snapshot = {
628            let user_shares = self
629                .user_shares
630                .read()
631                .map_err(|_| DeviceError::APIError("Lock poisoned on user_shares".to_string()))?;
632            user_shares.clone()
633        };
634        let fairness_score = |user_id: &str| -> f64 {
635            user_shares_snapshot.get(user_id).map_or(0.0, |s| {
636                if s.allocated_share < 1e-10 {
637                    f64::MAX
638                } else {
639                    s.used_share / s.allocated_share
640                }
641            })
642        };
643        let mut candidate: Option<(JobPriority, usize, f64)> = None;
644        {
645            let queues = self
646                .job_queues
647                .lock()
648                .map_err(|_| DeviceError::APIError("Lock poisoned on job_queues".to_string()))?;
649            let jobs = self
650                .jobs
651                .read()
652                .map_err(|_| DeviceError::APIError("Lock poisoned on jobs".to_string()))?;
653            for (&priority, queue) in queues.iter() {
654                if let Some((pos, job_id)) = queue.iter().enumerate().next() {
655                    let score = {
656                        macro_rules! try_downcast {
657                            ($n:expr) => {
658                                jobs.get(job_id)
659                                    .and_then(|b| b.downcast_ref::<super::types::QuantumJob<$n>>())
660                                    .map(|j| fairness_score(&j.user_id))
661                            };
662                        }
663                        try_downcast!(1)
664                            .or_else(|| try_downcast!(2))
665                            .or_else(|| try_downcast!(4))
666                            .or_else(|| try_downcast!(8))
667                            .or_else(|| try_downcast!(16))
668                            .or_else(|| try_downcast!(32))
669                            .or_else(|| try_downcast!(64))
670                            .unwrap_or(0.0)
671                    };
672                    let better = candidate.as_ref().map_or(true, |&(_, _, s)| score < s);
673                    if better {
674                        candidate = Some((priority, pos, score));
675                    }
676                }
677            }
678        }
679        if let Some((priority, pos, _)) = candidate {
680            let job_id =
681                {
682                    let mut queues = self.job_queues.lock().map_err(|_| {
683                        DeviceError::APIError("Lock poisoned on job_queues".to_string())
684                    })?;
685                    queues.get_mut(&priority).and_then(|q| {
686                        if pos < q.len() {
687                            q.remove(pos)
688                        } else {
689                            None
690                        }
691                    })
692                };
693            if let Some(job_id) = job_id {
694                if let Some(backend) = self.find_best_backend(&job_id).await? {
695                    self.assign_job_to_backend(&job_id, backend).await?;
696                }
697            }
698        }
699        Ok(())
700    }
701    async fn schedule_backfill(&self) -> DeviceResult<()> {
702        self.sort_queues_by_duration().await?;
703        let assignment = self.bin_pack_jobs().await?;
704        for (backend, job_ids) in assignment {
705            for job_id in job_ids {
706                let removed = {
707                    let mut queues = self.job_queues.lock().map_err(|_| {
708                        DeviceError::APIError("Lock poisoned on job_queues".to_string())
709                    })?;
710                    let mut found = false;
711                    for queue in queues.values_mut() {
712                        if let Some(pos) = queue.iter().position(|id| id == &job_id) {
713                            queue.remove(pos);
714                            found = true;
715                            break;
716                        }
717                    }
718                    found
719                };
720                if removed && self.is_backend_available(backend).await {
721                    self.assign_job_to_backend(&job_id, backend).await?;
722                }
723            }
724        }
725        Ok(())
726    }
727    async fn schedule_ml_optimized(&self) -> DeviceResult<()> {
728        #[cfg(feature = "scirs2")]
729        {
730            self.scirs2_optimize_schedule().await
731        }
732        #[cfg(not(feature = "scirs2"))]
733        {
734            self.schedule_priority_fifo().await
735        }
736    }
737    #[cfg(feature = "scirs2")]
738    async fn scirs2_optimize_schedule(&self) -> DeviceResult<()> {
739        self.schedule_priority_fifo().await
740    }
741    async fn find_best_backend(&self, job_id: &JobId) -> DeviceResult<Option<HardwareBackend>> {
742        {
743            let jobs = self
744                .jobs
745                .read()
746                .expect("Failed to acquire read lock on jobs in find_best_backend");
747            let _job_any = jobs
748                .get(job_id)
749                .ok_or_else(|| DeviceError::APIError("Job not found".to_string()))?;
750        }
751        let job_resource_requirements: Option<ResourceRequirements> = {
752            let config_map = self
753                .job_config_map
754                .read()
755                .expect("Failed to acquire read lock on job_config_map in find_best_backend");
756            config_map
757                .get(job_id)
758                .map(|cfg| cfg.resource_requirements.clone())
759        };
760        let backends: Vec<_> = {
761            let backends = self
762                .backends
763                .read()
764                .expect("Failed to acquire read lock on backends in find_best_backend");
765            backends.iter().copied().collect()
766        };
767        let allocation_strategy = {
768            let params = self
769                .params
770                .read()
771                .expect("Failed to acquire read lock on params in find_best_backend");
772            params.allocation_strategy.clone()
773        };
774        let backend_performance_snapshot = {
775            let backend_performance = self
776                .backend_performance
777                .read()
778                .expect("Failed to acquire read lock on backend_performance in find_best_backend");
779            backend_performance.clone()
780        };
781        match allocation_strategy {
782            AllocationStrategy::FirstFit => {
783                for backend in backends {
784                    if self.is_backend_available(backend).await {
785                        return Ok(Some(backend));
786                    }
787                }
788            }
789            AllocationStrategy::BestFit => {
790                let (capacity_snapshot, required_qubits) = {
791                    let resource_manager = self.resource_manager.lock().expect(
792                        "Failed to acquire lock on resource_manager in find_best_backend BestFit",
793                    );
794                    let snapshot: HashMap<HardwareBackend, usize> = resource_manager
795                        .available_resources
796                        .iter()
797                        .map(|(&b, cap)| (b, cap.qubits))
798                        .collect();
799                    let req = job_resource_requirements
800                        .as_ref()
801                        .map_or(1, |r| r.min_qubits);
802                    (snapshot, req)
803                };
804                let mut best_backend: Option<HardwareBackend> = None;
805                let mut best_excess = usize::MAX;
806                for &backend in &backends {
807                    if !self.is_backend_available(backend).await {
808                        continue;
809                    }
810                    if let Some(&cap_qubits) = capacity_snapshot.get(&backend) {
811                        if cap_qubits >= required_qubits {
812                            let excess = cap_qubits - required_qubits;
813                            if excess < best_excess {
814                                best_excess = excess;
815                                best_backend = Some(backend);
816                            }
817                        }
818                    }
819                }
820                return Ok(best_backend);
821            }
822            AllocationStrategy::LeastLoaded => {
823                let mut best_backend = None;
824                let mut lowest_utilization = f64::INFINITY;
825                for (&backend, perf) in &backend_performance_snapshot {
826                    if self.is_backend_available(backend).await
827                        && perf.utilization < lowest_utilization
828                    {
829                        lowest_utilization = perf.utilization;
830                        best_backend = Some(backend);
831                    }
832                }
833                return Ok(best_backend);
834            }
835            _ => {
836                for &backend in &backends {
837                    if self.is_backend_available(backend).await {
838                        return Ok(Some(backend));
839                    }
840                }
841            }
842        }
843        Ok(None)
844    }
845    async fn is_backend_available(&self, backend: HardwareBackend) -> bool {
846        let available = {
847            let running_jobs = self
848                .running_jobs
849                .read()
850                .expect("Failed to acquire read lock on running_jobs in is_backend_available");
851            let backend_jobs = running_jobs.values().filter(|(b, _)| *b == backend).count();
852            drop(running_jobs);
853            let resource_manager = self
854                .resource_manager
855                .lock()
856                .expect("Failed to acquire lock on resource_manager in is_backend_available");
857            let result = resource_manager
858                .available_resources
859                .get(&backend)
860                .is_some_and(|capacity| backend_jobs < capacity.concurrent_jobs);
861            drop(resource_manager);
862            result
863        };
864        available
865    }
866    async fn assign_job_to_backend(
867        &self,
868        job_id: &JobId,
869        backend: HardwareBackend,
870    ) -> DeviceResult<()> {
871        {
872            let mut running_jobs = self
873                .running_jobs
874                .write()
875                .expect("Failed to acquire write lock on running_jobs in assign_job_to_backend");
876            running_jobs.insert(job_id.clone(), (backend, SystemTime::now()));
877        }
878        self.update_job_status(job_id, JobStatus::Scheduled).await?;
879        let _ = self
880            .event_sender
881            .send(SchedulerEvent::JobScheduled(job_id.clone(), backend));
882        let job_id_clone = job_id.clone();
883        let scheduler = Arc::new(self.clone());
884        tokio::spawn(async move {
885            let _ = scheduler.execute_job(&job_id_clone, backend).await;
886        });
887        Ok(())
888    }
889    async fn execute_job(&self, job_id: &JobId, backend: HardwareBackend) -> DeviceResult<()> {
890        self.update_job_status(job_id, JobStatus::Running).await?;
891        let _ = self
892            .event_sender
893            .send(SchedulerEvent::JobStarted(job_id.clone()));
894        let execution_start = SystemTime::now();
895        {
896            let backends = self
897                .backends
898                .read()
899                .expect("Failed to acquire read lock on backends in execute_job");
900            if !backends.contains(&backend) {
901                return Err(DeviceError::APIError("Backend not found".to_string()));
902            }
903        }
904        let job_config = {
905            let config_map = self
906                .job_config_map
907                .read()
908                .expect("Failed to acquire read lock on job_config_map in execute_job");
909            config_map.get(job_id).cloned()
910        };
911        let queue_time = {
912            let jobs = self
913                .jobs
914                .read()
915                .expect("Failed to acquire read lock on jobs in execute_job queue_time");
916            macro_rules! try_submitted_at {
917                ($n:expr) => {
918                    jobs.get(job_id)
919                        .and_then(|b| b.downcast_ref::<super::types::QuantumJob<$n>>())
920                        .map(|j| {
921                            execution_start
922                                .duration_since(j.submitted_at)
923                                .unwrap_or(Duration::from_secs(0))
924                        })
925                };
926            }
927            try_submitted_at!(1)
928                .or_else(|| try_submitted_at!(2))
929                .or_else(|| try_submitted_at!(4))
930                .or_else(|| try_submitted_at!(8))
931                .or_else(|| try_submitted_at!(16))
932                .or_else(|| try_submitted_at!(32))
933                .or_else(|| try_submitted_at!(64))
934                .or_else(|| try_submitted_at!(128))
935                .unwrap_or(Duration::from_secs(0))
936        };
937        {
938            let status_map = self.job_status_map.read().expect(
939                "Failed to acquire read lock on job_status_map in execute_job cancellation check",
940            );
941            if status_map.get(job_id) == Some(&JobStatus::Cancelled) {
942                return Ok(());
943            }
944        }
945        let simulated_execution_time = job_config
946            .as_ref()
947            .map(|_cfg| Duration::from_secs(1))
948            .unwrap_or(Duration::from_secs(1));
949        tokio::time::sleep(simulated_execution_time).await;
950        {
951            let mut running_jobs = self
952                .running_jobs
953                .write()
954                .expect("Failed to acquire write lock on running_jobs in execute_job cleanup");
955            running_jobs.remove(job_id);
956        }
957        self.update_job_status(job_id, JobStatus::Completed).await?;
958        let execution_end = SystemTime::now();
959        let execution_time = execution_end
960            .duration_since(execution_start)
961            .unwrap_or(Duration::from_secs(0));
962        let metrics = ExecutionMetrics {
963            queue_time,
964            execution_time: Some(execution_time),
965            resource_utilization: 1.0,
966            cost: job_config.as_ref().and_then(|c| c.cost_limit),
967            quality_metrics: {
968                let mut m = HashMap::new();
969                m.insert(
970                    "execution_time_secs".to_string(),
971                    execution_time.as_secs_f64(),
972                );
973                m.insert("queue_time_secs".to_string(), queue_time.as_secs_f64());
974                m
975            },
976        };
977        {
978            let mut metrics_map = self
979                .job_metrics_map
980                .write()
981                .expect("Failed to acquire write lock on job_metrics_map in execute_job");
982            metrics_map.insert(job_id.clone(), metrics.clone());
983        }
984        {
985            let mut perf_map = self
986                .backend_performance
987                .write()
988                .expect("Failed to acquire write lock on backend_performance in execute_job");
989            if let Some(perf) = perf_map.get_mut(&backend) {
990                let alpha = 0.2_f64;
991                let prev_exec_secs = perf.avg_execution_time.as_secs_f64();
992                let new_exec_secs =
993                    (1.0 - alpha) * prev_exec_secs + alpha * execution_time.as_secs_f64();
994                perf.avg_execution_time = Duration::from_secs_f64(new_exec_secs.max(0.0));
995                let prev_queue_secs = perf.avg_queue_time.as_secs_f64();
996                let new_queue_secs =
997                    (1.0 - alpha) * prev_queue_secs + alpha * queue_time.as_secs_f64();
998                perf.avg_queue_time = Duration::from_secs_f64(new_queue_secs.max(0.0));
999                perf.success_rate = (1.0 - alpha) * perf.success_rate + alpha * 1.0;
1000                perf.last_updated = execution_end;
1001            }
1002        }
1003        {
1004            let mut history = self
1005                .execution_history
1006                .write()
1007                .expect("Failed to acquire write lock on execution_history in execute_job");
1008            history.push(JobExecution {
1009                attempt: 1,
1010                backend,
1011                started_at: execution_start,
1012                ended_at: Some(execution_end),
1013                result: None,
1014                error: None,
1015                metrics,
1016            });
1017        }
1018        Ok(())
1019    }
1020    async fn performance_monitoring_loop(&self) {
1021        while *self
1022            .scheduler_running
1023            .lock()
1024            .expect("Failed to acquire lock on scheduler_running in performance_monitoring_loop")
1025        {
1026            self.update_backend_performance().await;
1027            tokio::time::sleep(Duration::from_secs(30)).await;
1028        }
1029    }
1030    async fn update_backend_performance(&self) {
1031        let mut backend_performance = self.backend_performance.write().expect(
1032            "Failed to acquire write lock on backend_performance in update_backend_performance",
1033        );
1034        let now = SystemTime::now();
1035        for (backend, perf) in backend_performance.iter_mut() {
1036            perf.last_updated = now;
1037            let snapshot = PerformanceSnapshot {
1038                timestamp: now,
1039                queue_length: perf.queue_length,
1040                utilization: perf.utilization,
1041                avg_queue_time_secs: perf.avg_queue_time.as_secs_f64(),
1042                success_rate: perf.success_rate,
1043            };
1044            perf.history.push_back(snapshot);
1045            let cutoff = now - Duration::from_secs(86400);
1046            while let Some(front) = perf.history.front() {
1047                if front.timestamp < cutoff {
1048                    perf.history.pop_front();
1049                } else {
1050                    break;
1051                }
1052            }
1053        }
1054    }
1055    async fn scirs2_optimization_loop(&self) {
1056        let frequency = {
1057            let params = self
1058                .params
1059                .read()
1060                .expect("Failed to acquire read lock on params in scirs2_optimization_loop");
1061            params.scirs2_params.optimization_frequency
1062        };
1063        loop {
1064            let should_continue = *self
1065                .scheduler_running
1066                .lock()
1067                .expect("Failed to acquire lock on scheduler_running in scirs2_optimization_loop");
1068            if !should_continue {
1069                break;
1070            }
1071            if let Err(e) = self.run_scirs2_optimization().await {
1072                eprintln!("SciRS2 optimization error: {e}");
1073            }
1074            tokio::time::sleep(frequency).await;
1075        }
1076    }
1077    async fn run_scirs2_optimization(&self) -> DeviceResult<()> {
1078        #[cfg(feature = "scirs2")]
1079        {
1080            let backend_snapshot: Vec<(HardwareBackend, f64)> = {
1081                let bp = self.backend_performance.read().expect(
1082                    "Failed to acquire read lock on backend_performance in run_scirs2_optimization",
1083                );
1084                bp.iter().map(|(&b, p)| (b, p.utilization)).collect()
1085            };
1086            let performance_data: Vec<f64> = backend_snapshot.iter().map(|(_, u)| *u).collect();
1087            if performance_data.len() > 1 {
1088                use scirs2_core::ndarray::Array1;
1089                let data_array = Array1::from_vec(performance_data);
1090                let avg_utilization: f64 = mean(&data_array.view()).unwrap_or(0.5);
1091                let utilization_std: f64 = stats_std(&data_array.view(), 1, None).unwrap_or(0.1);
1092                let overload_threshold = avg_utilization + utilization_std;
1093                let underload_threshold = (avg_utilization - utilization_std).max(0.0);
1094                let overloaded: Vec<HardwareBackend> = backend_snapshot
1095                    .iter()
1096                    .filter(|(_, u)| *u > overload_threshold)
1097                    .map(|(b, _)| *b)
1098                    .collect();
1099                let underloaded: Vec<(HardwareBackend, f64)> = backend_snapshot
1100                    .iter()
1101                    .filter(|(_, u)| *u < underload_threshold)
1102                    .copied()
1103                    .collect();
1104                if underloaded.is_empty() {
1105                    return Ok(());
1106                }
1107                for overloaded_backend in overloaded {
1108                    let target_backend = underloaded
1109                        .iter()
1110                        .min_by(|(_, ua), (_, ub)| {
1111                            ua.partial_cmp(ub).unwrap_or(std::cmp::Ordering::Equal)
1112                        })
1113                        .map(|(b, _)| *b)
1114                        .unwrap_or(underloaded[0].0);
1115                    let candidate_job_id: Option<JobId> = {
1116                        let queues = self
1117                            .job_queues
1118                            .lock()
1119                            .expect("Failed to acquire lock on job_queues in load balancing");
1120                        let config_map = self
1121                            .job_config_map
1122                            .read()
1123                            .expect("Failed to read job_config_map in load balancing");
1124                        let mut found = None;
1125                        'outer: for queue in queues.values() {
1126                            for job_id in queue.iter() {
1127                                if let Some(cfg) = config_map.get(job_id) {
1128                                    if cfg.preferred_backends.first() == Some(&overloaded_backend) {
1129                                        found = Some(job_id.clone());
1130                                        break 'outer;
1131                                    }
1132                                }
1133                            }
1134                        }
1135                        found
1136                    };
1137                    if let Some(job_id) = candidate_job_id {
1138                        let mut config_map = self
1139                            .job_config_map
1140                            .write()
1141                            .expect("Failed to write job_config_map in load balancing");
1142                        if let Some(cfg) = config_map.get_mut(&job_id) {
1143                            cfg.preferred_backends.insert(0, target_backend);
1144                        }
1145                    }
1146                }
1147            }
1148        }
1149        Ok(())
1150    }
1151}