quantrs2_sim/
realtime_hardware_integration.rs

1//! Real-Time Hardware Integration for Cloud Quantum Computers
2//!
3//! This module provides real-time integration capabilities with cloud quantum hardware,
4//! including live job monitoring, streaming results, dynamic calibration tracking,
5//! and real-time error rate monitoring. It enables responsive quantum-classical
6//! hybrid algorithms with immediate feedback from quantum hardware.
7//!
8//! # Features
9//! - Real-time job status monitoring with callbacks
10//! - Streaming measurement results for iterative algorithms
11//! - Dynamic hardware calibration tracking
12//! - Live error rate monitoring and adaptation
13//! - WebSocket-based event streaming (simulated)
14//! - Circuit execution progress tracking
15//! - Hardware availability notifications
16
17use quantrs2_core::error::{QuantRS2Error, QuantRS2Result};
18use scirs2_core::ndarray::Array1;
19use scirs2_core::Complex64;
20use serde::{Deserialize, Serialize};
21use std::collections::{HashMap, VecDeque};
22use std::sync::{Arc, Mutex, RwLock};
23use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
24
25/// Real-time hardware integration manager
26#[derive(Debug)]
27pub struct RealtimeHardwareManager {
28    /// Active hardware connections
29    connections: Arc<RwLock<HashMap<String, HardwareConnection>>>,
30    /// Job monitor for tracking execution
31    job_monitor: JobMonitor,
32    /// Calibration tracker
33    calibration_tracker: CalibrationTracker,
34    /// Event stream for real-time updates
35    event_stream: EventStream,
36    /// Configuration
37    config: RealtimeConfig,
38    /// Statistics
39    stats: Arc<Mutex<RealtimeStats>>,
40}
41
42/// Configuration for real-time hardware integration
43#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct RealtimeConfig {
45    /// Polling interval for job status (milliseconds)
46    pub polling_interval_ms: u64,
47    /// Enable streaming results
48    pub enable_streaming: bool,
49    /// Maximum event buffer size
50    pub max_event_buffer: usize,
51    /// Calibration update interval (seconds)
52    pub calibration_update_interval: u64,
53    /// Enable adaptive error mitigation
54    pub enable_adaptive_mitigation: bool,
55    /// Timeout for real-time operations (seconds)
56    pub operation_timeout: u64,
57    /// Enable hardware availability notifications
58    pub enable_availability_notifications: bool,
59    /// Maximum concurrent jobs
60    pub max_concurrent_jobs: usize,
61}
62
63impl Default for RealtimeConfig {
64    fn default() -> Self {
65        Self {
66            polling_interval_ms: 500,
67            enable_streaming: true,
68            max_event_buffer: 1000,
69            calibration_update_interval: 300,
70            enable_adaptive_mitigation: true,
71            operation_timeout: 3600,
72            enable_availability_notifications: true,
73            max_concurrent_jobs: 10,
74        }
75    }
76}
77
78/// Hardware connection state
79#[derive(Debug, Clone)]
80pub struct HardwareConnection {
81    /// Connection ID
82    pub id: String,
83    /// Hardware provider
84    pub provider: HardwareProvider,
85    /// Connection status
86    pub status: ConnectionStatus,
87    /// Backend name
88    pub backend: String,
89    /// Connection timestamp
90    pub connected_at: u64,
91    /// Last heartbeat
92    pub last_heartbeat: u64,
93    /// Current calibration data
94    pub calibration: Option<CalibrationData>,
95}
96
97/// Hardware provider types
98#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
99pub enum HardwareProvider {
100    IBMQuantum,
101    GoogleQuantumAI,
102    AmazonBraket,
103    AzureQuantum,
104    IonQ,
105    Rigetti,
106    Xanadu,
107    Pasqal,
108}
109
110/// Connection status
111#[derive(Debug, Clone, Copy, PartialEq, Eq)]
112pub enum ConnectionStatus {
113    Connected,
114    Connecting,
115    Disconnected,
116    Error,
117    Maintenance,
118}
119
120/// Job monitor for tracking execution
121pub struct JobMonitor {
122    /// Active jobs being monitored
123    active_jobs: Arc<RwLock<HashMap<String, JobState>>>,
124    /// Job history
125    job_history: Arc<Mutex<VecDeque<JobRecord>>>,
126    /// Callback handlers for job events
127    callbacks: Arc<Mutex<HashMap<String, Vec<Box<dyn Fn(&JobEvent) + Send + Sync>>>>>,
128}
129
130impl std::fmt::Debug for JobMonitor {
131    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
132        f.debug_struct("JobMonitor")
133            .field("active_jobs", &"<jobs>")
134            .field("job_history", &"<history>")
135            .field("callbacks", &"<callbacks>")
136            .finish()
137    }
138}
139
140/// State of a monitored job
141#[derive(Debug, Clone)]
142pub struct JobState {
143    /// Job ID
144    pub job_id: String,
145    /// Current status
146    pub status: JobStatus,
147    /// Progress (0.0 - 1.0)
148    pub progress: f64,
149    /// Start time
150    pub start_time: Instant,
151    /// Estimated completion time
152    pub estimated_completion: Option<Duration>,
153    /// Partial results (for streaming)
154    pub partial_results: Vec<PartialResult>,
155    /// Error information
156    pub error_info: Option<String>,
157    /// Queue position
158    pub queue_position: Option<usize>,
159}
160
161/// Job status
162#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
163pub enum JobStatus {
164    Queued,
165    Running,
166    Completed,
167    Failed,
168    Cancelled,
169    TimedOut,
170}
171
172/// Partial result for streaming
173#[derive(Debug, Clone)]
174pub struct PartialResult {
175    /// Result index
176    pub index: usize,
177    /// Measurement counts
178    pub counts: HashMap<String, usize>,
179    /// Timestamp
180    pub timestamp: u64,
181}
182
183/// Job event for callbacks
184#[derive(Debug, Clone)]
185pub struct JobEvent {
186    /// Event type
187    pub event_type: JobEventType,
188    /// Job ID
189    pub job_id: String,
190    /// Event data
191    pub data: JobEventData,
192    /// Timestamp
193    pub timestamp: u64,
194}
195
196/// Types of job events
197#[derive(Debug, Clone, Copy, PartialEq, Eq)]
198pub enum JobEventType {
199    StatusChanged,
200    ProgressUpdate,
201    PartialResult,
202    Completed,
203    Failed,
204    QueuePositionChanged,
205}
206
207/// Job event data
208#[derive(Debug, Clone)]
209pub enum JobEventData {
210    Status(JobStatus),
211    Progress(f64),
212    Result(PartialResult),
213    Error(String),
214    QueuePosition(usize),
215    None,
216}
217
218/// Job record for history
219#[derive(Debug, Clone)]
220pub struct JobRecord {
221    /// Job ID
222    pub job_id: String,
223    /// Final status
224    pub status: JobStatus,
225    /// Start time
226    pub start_time: u64,
227    /// End time
228    pub end_time: u64,
229    /// Total shots
230    pub total_shots: usize,
231    /// Backend used
232    pub backend: String,
233}
234
235/// Calibration tracker for hardware
236#[derive(Debug)]
237pub struct CalibrationTracker {
238    /// Current calibration data by backend
239    calibrations: Arc<RwLock<HashMap<String, CalibrationData>>>,
240    /// Calibration history
241    history: Arc<Mutex<HashMap<String, VecDeque<CalibrationSnapshot>>>>,
242    /// Last update times
243    last_updates: Arc<Mutex<HashMap<String, Instant>>>,
244}
245
246/// Hardware calibration data
247#[derive(Debug, Clone, Serialize, Deserialize)]
248pub struct CalibrationData {
249    /// Backend name
250    pub backend: String,
251    /// Timestamp
252    pub timestamp: u64,
253    /// Single-qubit gate errors by qubit
254    pub single_qubit_errors: HashMap<usize, f64>,
255    /// Two-qubit gate errors by qubit pair
256    pub two_qubit_errors: HashMap<(usize, usize), f64>,
257    /// Readout errors by qubit
258    pub readout_errors: HashMap<usize, f64>,
259    /// T1 times by qubit (microseconds)
260    pub t1_times: HashMap<usize, f64>,
261    /// T2 times by qubit (microseconds)
262    pub t2_times: HashMap<usize, f64>,
263    /// Gate durations (nanoseconds)
264    pub gate_durations: HashMap<String, f64>,
265    /// Connectivity graph
266    pub connectivity: Vec<(usize, usize)>,
267}
268
269/// Calibration snapshot for history
270#[derive(Debug, Clone)]
271pub struct CalibrationSnapshot {
272    /// Timestamp
273    pub timestamp: u64,
274    /// Average single-qubit error
275    pub avg_single_qubit_error: f64,
276    /// Average two-qubit error
277    pub avg_two_qubit_error: f64,
278    /// Average readout error
279    pub avg_readout_error: f64,
280}
281
282/// Event stream for real-time updates
283pub struct EventStream {
284    /// Event buffer
285    buffer: Arc<Mutex<VecDeque<HardwareEvent>>>,
286    /// Maximum buffer size
287    max_size: usize,
288    /// Event subscribers
289    subscribers: Arc<Mutex<Vec<Box<dyn Fn(&HardwareEvent) + Send + Sync>>>>,
290}
291
292impl std::fmt::Debug for EventStream {
293    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
294        f.debug_struct("EventStream")
295            .field("buffer", &"<buffer>")
296            .field("max_size", &self.max_size)
297            .field("subscribers", &"<subscribers>")
298            .finish()
299    }
300}
301
302/// Hardware events
303#[derive(Debug, Clone)]
304pub struct HardwareEvent {
305    /// Event type
306    pub event_type: HardwareEventType,
307    /// Source backend
308    pub backend: String,
309    /// Event data
310    pub data: HardwareEventData,
311    /// Timestamp
312    pub timestamp: u64,
313}
314
315/// Types of hardware events
316#[derive(Debug, Clone, Copy, PartialEq, Eq)]
317pub enum HardwareEventType {
318    CalibrationUpdated,
319    AvailabilityChanged,
320    ErrorRateAlert,
321    MaintenanceScheduled,
322    JobQueued,
323    JobStarted,
324    JobCompleted,
325}
326
327/// Hardware event data
328#[derive(Debug, Clone)]
329pub enum HardwareEventData {
330    Calibration(CalibrationData),
331    Availability(bool),
332    ErrorRate(f64),
333    Maintenance { start: u64, end: u64 },
334    JobInfo { job_id: String, shots: usize },
335    None,
336}
337
338/// Statistics for real-time operations
339#[derive(Debug, Clone, Default)]
340pub struct RealtimeStats {
341    /// Total jobs monitored
342    pub jobs_monitored: u64,
343    /// Jobs completed successfully
344    pub jobs_completed: u64,
345    /// Jobs failed
346    pub jobs_failed: u64,
347    /// Total events processed
348    pub events_processed: u64,
349    /// Calibration updates received
350    pub calibration_updates: u64,
351    /// Average job completion time
352    pub avg_completion_time: Duration,
353    /// Current active connections
354    pub active_connections: usize,
355}
356
357impl RealtimeHardwareManager {
358    /// Create a new real-time hardware manager
359    pub fn new(config: RealtimeConfig) -> Self {
360        Self {
361            connections: Arc::new(RwLock::new(HashMap::new())),
362            job_monitor: JobMonitor::new(),
363            calibration_tracker: CalibrationTracker::new(),
364            event_stream: EventStream::new(config.max_event_buffer),
365            config,
366            stats: Arc::new(Mutex::new(RealtimeStats::default())),
367        }
368    }
369
370    /// Connect to hardware backend
371    pub fn connect(&mut self, provider: HardwareProvider, backend: &str) -> QuantRS2Result<String> {
372        let conn_id = format!("{provider:?}_{backend}");
373        let now = SystemTime::now()
374            .duration_since(UNIX_EPOCH)
375            .unwrap_or_default()
376            .as_secs();
377
378        let connection = HardwareConnection {
379            id: conn_id.clone(),
380            provider,
381            status: ConnectionStatus::Connected,
382            backend: backend.to_string(),
383            connected_at: now,
384            last_heartbeat: now,
385            calibration: None,
386        };
387
388        let mut connections = self.connections.write().map_err(|_| {
389            QuantRS2Error::InvalidInput("Failed to acquire connections lock".to_string())
390        })?;
391        connections.insert(conn_id.clone(), connection);
392
393        // Update stats
394        let mut stats = self
395            .stats
396            .lock()
397            .map_err(|_| QuantRS2Error::InvalidInput("Failed to acquire stats lock".to_string()))?;
398        stats.active_connections += 1;
399
400        Ok(conn_id)
401    }
402
403    /// Disconnect from hardware backend
404    pub fn disconnect(&mut self, connection_id: &str) -> QuantRS2Result<()> {
405        let mut connections = self.connections.write().map_err(|_| {
406            QuantRS2Error::InvalidInput("Failed to acquire connections lock".to_string())
407        })?;
408
409        if connections.remove(connection_id).is_some() {
410            let mut stats = self.stats.lock().map_err(|_| {
411                QuantRS2Error::InvalidInput("Failed to acquire stats lock".to_string())
412            })?;
413            if stats.active_connections > 0 {
414                stats.active_connections -= 1;
415            }
416        }
417
418        Ok(())
419    }
420
421    /// Submit job for real-time monitoring
422    pub fn submit_job(&mut self, job_id: &str, connection_id: &str) -> QuantRS2Result<()> {
423        let connections = self.connections.read().map_err(|_| {
424            QuantRS2Error::InvalidInput("Failed to acquire connections lock".to_string())
425        })?;
426
427        if !connections.contains_key(connection_id) {
428            return Err(QuantRS2Error::InvalidInput(format!(
429                "Connection {connection_id} not found"
430            )));
431        }
432
433        // Create job state
434        let job_state = JobState {
435            job_id: job_id.to_string(),
436            status: JobStatus::Queued,
437            progress: 0.0,
438            start_time: Instant::now(),
439            estimated_completion: None,
440            partial_results: Vec::new(),
441            error_info: None,
442            queue_position: Some(1),
443        };
444
445        self.job_monitor.add_job(job_state)?;
446
447        // Update stats
448        let mut stats = self
449            .stats
450            .lock()
451            .map_err(|_| QuantRS2Error::InvalidInput("Failed to acquire stats lock".to_string()))?;
452        stats.jobs_monitored += 1;
453
454        Ok(())
455    }
456
457    /// Get job status
458    pub fn get_job_status(&self, job_id: &str) -> QuantRS2Result<JobStatus> {
459        self.job_monitor.get_status(job_id)
460    }
461
462    /// Get job progress
463    pub fn get_job_progress(&self, job_id: &str) -> QuantRS2Result<f64> {
464        self.job_monitor.get_progress(job_id)
465    }
466
467    /// Update job status (simulates receiving update from hardware)
468    pub fn update_job_status(
469        &mut self,
470        job_id: &str,
471        status: JobStatus,
472        progress: f64,
473    ) -> QuantRS2Result<()> {
474        self.job_monitor.update_status(job_id, status, progress)?;
475
476        // Update stats if completed
477        if status == JobStatus::Completed {
478            let mut stats = self.stats.lock().map_err(|_| {
479                QuantRS2Error::InvalidInput("Failed to acquire stats lock".to_string())
480            })?;
481            stats.jobs_completed += 1;
482        } else if status == JobStatus::Failed {
483            let mut stats = self.stats.lock().map_err(|_| {
484                QuantRS2Error::InvalidInput("Failed to acquire stats lock".to_string())
485            })?;
486            stats.jobs_failed += 1;
487        }
488
489        Ok(())
490    }
491
492    /// Add partial result for streaming
493    pub fn add_partial_result(
494        &mut self,
495        job_id: &str,
496        counts: HashMap<String, usize>,
497    ) -> QuantRS2Result<()> {
498        self.job_monitor.add_partial_result(job_id, counts)
499    }
500
501    /// Get partial results for job
502    pub fn get_partial_results(&self, job_id: &str) -> QuantRS2Result<Vec<PartialResult>> {
503        self.job_monitor.get_partial_results(job_id)
504    }
505
506    /// Update calibration data for backend
507    pub fn update_calibration(
508        &mut self,
509        backend: &str,
510        calibration: CalibrationData,
511    ) -> QuantRS2Result<()> {
512        self.calibration_tracker
513            .update_calibration(backend, calibration)?;
514
515        // Update stats
516        let mut stats = self
517            .stats
518            .lock()
519            .map_err(|_| QuantRS2Error::InvalidInput("Failed to acquire stats lock".to_string()))?;
520        stats.calibration_updates += 1;
521
522        Ok(())
523    }
524
525    /// Get current calibration for backend
526    pub fn get_calibration(&self, backend: &str) -> QuantRS2Result<Option<CalibrationData>> {
527        self.calibration_tracker.get_calibration(backend)
528    }
529
530    /// Get optimal qubits based on current calibration
531    pub fn get_optimal_qubits(
532        &self,
533        backend: &str,
534        num_qubits: usize,
535    ) -> QuantRS2Result<Vec<usize>> {
536        let calibration = self.get_calibration(backend)?;
537
538        match calibration {
539            Some(cal) => {
540                // Sort qubits by error rate (lowest first)
541                let mut qubits: Vec<(usize, f64)> = cal
542                    .single_qubit_errors
543                    .iter()
544                    .map(|(&q, &e)| (q, e))
545                    .collect();
546                qubits.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
547
548                Ok(qubits
549                    .into_iter()
550                    .take(num_qubits)
551                    .map(|(q, _)| q)
552                    .collect())
553            }
554            None => {
555                // No calibration data, return sequential qubits
556                Ok((0..num_qubits).collect())
557            }
558        }
559    }
560
561    /// Get statistics
562    pub fn get_stats(&self) -> QuantRS2Result<RealtimeStats> {
563        let stats = self
564            .stats
565            .lock()
566            .map_err(|_| QuantRS2Error::InvalidInput("Failed to acquire stats lock".to_string()))?;
567        Ok(stats.clone())
568    }
569
570    /// Get active connections
571    pub fn get_connections(&self) -> QuantRS2Result<Vec<HardwareConnection>> {
572        let connections = self.connections.read().map_err(|_| {
573            QuantRS2Error::InvalidInput("Failed to acquire connections lock".to_string())
574        })?;
575        Ok(connections.values().cloned().collect())
576    }
577
578    /// Register job event callback
579    pub fn register_job_callback<F>(&mut self, job_id: &str, callback: F) -> QuantRS2Result<()>
580    where
581        F: Fn(&JobEvent) + Send + Sync + 'static,
582    {
583        self.job_monitor.register_callback(job_id, callback)
584    }
585
586    /// Check if backend is available
587    pub fn is_backend_available(&self, connection_id: &str) -> QuantRS2Result<bool> {
588        let connections = self.connections.read().map_err(|_| {
589            QuantRS2Error::InvalidInput("Failed to acquire connections lock".to_string())
590        })?;
591
592        match connections.get(connection_id) {
593            Some(conn) => Ok(conn.status == ConnectionStatus::Connected),
594            None => Ok(false),
595        }
596    }
597}
598
599impl JobMonitor {
600    /// Create new job monitor
601    fn new() -> Self {
602        Self {
603            active_jobs: Arc::new(RwLock::new(HashMap::new())),
604            job_history: Arc::new(Mutex::new(VecDeque::new())),
605            callbacks: Arc::new(Mutex::new(HashMap::new())),
606        }
607    }
608
609    /// Add job for monitoring
610    fn add_job(&self, job_state: JobState) -> QuantRS2Result<()> {
611        let mut jobs = self
612            .active_jobs
613            .write()
614            .map_err(|_| QuantRS2Error::InvalidInput("Failed to acquire jobs lock".to_string()))?;
615        jobs.insert(job_state.job_id.clone(), job_state);
616        Ok(())
617    }
618
619    /// Get job status
620    fn get_status(&self, job_id: &str) -> QuantRS2Result<JobStatus> {
621        let jobs = self
622            .active_jobs
623            .read()
624            .map_err(|_| QuantRS2Error::InvalidInput("Failed to acquire jobs lock".to_string()))?;
625
626        jobs.get(job_id)
627            .map(|j| j.status)
628            .ok_or_else(|| QuantRS2Error::InvalidInput(format!("Job {job_id} not found")))
629    }
630
631    /// Get job progress
632    fn get_progress(&self, job_id: &str) -> QuantRS2Result<f64> {
633        let jobs = self
634            .active_jobs
635            .read()
636            .map_err(|_| QuantRS2Error::InvalidInput("Failed to acquire jobs lock".to_string()))?;
637
638        jobs.get(job_id)
639            .map(|j| j.progress)
640            .ok_or_else(|| QuantRS2Error::InvalidInput(format!("Job {job_id} not found")))
641    }
642
643    /// Update job status
644    fn update_status(&self, job_id: &str, status: JobStatus, progress: f64) -> QuantRS2Result<()> {
645        let mut jobs = self
646            .active_jobs
647            .write()
648            .map_err(|_| QuantRS2Error::InvalidInput("Failed to acquire jobs lock".to_string()))?;
649
650        if let Some(job) = jobs.get_mut(job_id) {
651            job.status = status;
652            job.progress = progress;
653
654            // Trigger callbacks
655            self.trigger_callback(
656                job_id,
657                JobEventType::StatusChanged,
658                JobEventData::Status(status),
659            )?;
660            self.trigger_callback(
661                job_id,
662                JobEventType::ProgressUpdate,
663                JobEventData::Progress(progress),
664            )?;
665        }
666
667        Ok(())
668    }
669
670    /// Add partial result
671    fn add_partial_result(
672        &self,
673        job_id: &str,
674        counts: HashMap<String, usize>,
675    ) -> QuantRS2Result<()> {
676        let mut jobs = self
677            .active_jobs
678            .write()
679            .map_err(|_| QuantRS2Error::InvalidInput("Failed to acquire jobs lock".to_string()))?;
680
681        if let Some(job) = jobs.get_mut(job_id) {
682            let result = PartialResult {
683                index: job.partial_results.len(),
684                counts,
685                timestamp: SystemTime::now()
686                    .duration_since(UNIX_EPOCH)
687                    .unwrap_or_default()
688                    .as_secs(),
689            };
690            job.partial_results.push(result.clone());
691
692            // Trigger callback
693            drop(jobs);
694            self.trigger_callback(
695                job_id,
696                JobEventType::PartialResult,
697                JobEventData::Result(result),
698            )?;
699        }
700
701        Ok(())
702    }
703
704    /// Get partial results
705    fn get_partial_results(&self, job_id: &str) -> QuantRS2Result<Vec<PartialResult>> {
706        let jobs = self
707            .active_jobs
708            .read()
709            .map_err(|_| QuantRS2Error::InvalidInput("Failed to acquire jobs lock".to_string()))?;
710
711        jobs.get(job_id)
712            .map(|j| j.partial_results.clone())
713            .ok_or_else(|| QuantRS2Error::InvalidInput(format!("Job {job_id} not found")))
714    }
715
716    /// Register callback for job events
717    fn register_callback<F>(&self, job_id: &str, callback: F) -> QuantRS2Result<()>
718    where
719        F: Fn(&JobEvent) + Send + Sync + 'static,
720    {
721        let mut callbacks = self.callbacks.lock().map_err(|_| {
722            QuantRS2Error::InvalidInput("Failed to acquire callbacks lock".to_string())
723        })?;
724
725        callbacks
726            .entry(job_id.to_string())
727            .or_insert_with(Vec::new)
728            .push(Box::new(callback));
729
730        Ok(())
731    }
732
733    /// Trigger callbacks for event
734    fn trigger_callback(
735        &self,
736        job_id: &str,
737        event_type: JobEventType,
738        data: JobEventData,
739    ) -> QuantRS2Result<()> {
740        let callbacks = self.callbacks.lock().map_err(|_| {
741            QuantRS2Error::InvalidInput("Failed to acquire callbacks lock".to_string())
742        })?;
743
744        if let Some(handlers) = callbacks.get(job_id) {
745            let event = JobEvent {
746                event_type,
747                job_id: job_id.to_string(),
748                data,
749                timestamp: SystemTime::now()
750                    .duration_since(UNIX_EPOCH)
751                    .unwrap_or_default()
752                    .as_secs(),
753            };
754
755            for handler in handlers {
756                handler(&event);
757            }
758        }
759
760        Ok(())
761    }
762}
763
764impl CalibrationTracker {
765    /// Create new calibration tracker
766    fn new() -> Self {
767        Self {
768            calibrations: Arc::new(RwLock::new(HashMap::new())),
769            history: Arc::new(Mutex::new(HashMap::new())),
770            last_updates: Arc::new(Mutex::new(HashMap::new())),
771        }
772    }
773
774    /// Update calibration data
775    fn update_calibration(
776        &self,
777        backend: &str,
778        calibration: CalibrationData,
779    ) -> QuantRS2Result<()> {
780        // Calculate snapshot
781        let avg_single = if calibration.single_qubit_errors.is_empty() {
782            0.0
783        } else {
784            calibration.single_qubit_errors.values().sum::<f64>()
785                / calibration.single_qubit_errors.len() as f64
786        };
787
788        let avg_two = if calibration.two_qubit_errors.is_empty() {
789            0.0
790        } else {
791            calibration.two_qubit_errors.values().sum::<f64>()
792                / calibration.two_qubit_errors.len() as f64
793        };
794
795        let avg_readout = if calibration.readout_errors.is_empty() {
796            0.0
797        } else {
798            calibration.readout_errors.values().sum::<f64>()
799                / calibration.readout_errors.len() as f64
800        };
801
802        let snapshot = CalibrationSnapshot {
803            timestamp: calibration.timestamp,
804            avg_single_qubit_error: avg_single,
805            avg_two_qubit_error: avg_two,
806            avg_readout_error: avg_readout,
807        };
808
809        // Store calibration
810        let mut calibrations = self.calibrations.write().map_err(|_| {
811            QuantRS2Error::InvalidInput("Failed to acquire calibrations lock".to_string())
812        })?;
813        calibrations.insert(backend.to_string(), calibration);
814
815        // Store snapshot in history
816        let mut history = self.history.lock().map_err(|_| {
817            QuantRS2Error::InvalidInput("Failed to acquire history lock".to_string())
818        })?;
819        history
820            .entry(backend.to_string())
821            .or_insert_with(VecDeque::new)
822            .push_back(snapshot);
823
824        // Update last update time
825        let mut last_updates = self.last_updates.lock().map_err(|_| {
826            QuantRS2Error::InvalidInput("Failed to acquire last_updates lock".to_string())
827        })?;
828        last_updates.insert(backend.to_string(), Instant::now());
829
830        Ok(())
831    }
832
833    /// Get current calibration
834    fn get_calibration(&self, backend: &str) -> QuantRS2Result<Option<CalibrationData>> {
835        let calibrations = self.calibrations.read().map_err(|_| {
836            QuantRS2Error::InvalidInput("Failed to acquire calibrations lock".to_string())
837        })?;
838        Ok(calibrations.get(backend).cloned())
839    }
840}
841
842impl EventStream {
843    /// Create new event stream
844    fn new(max_size: usize) -> Self {
845        Self {
846            buffer: Arc::new(Mutex::new(VecDeque::new())),
847            max_size,
848            subscribers: Arc::new(Mutex::new(Vec::new())),
849        }
850    }
851}
852
853#[cfg(test)]
854mod tests {
855    use super::*;
856
857    #[test]
858    fn test_realtime_manager_creation() {
859        let config = RealtimeConfig::default();
860        let manager = RealtimeHardwareManager::new(config);
861        assert!(manager.get_stats().is_ok());
862    }
863
864    #[test]
865    fn test_connect_disconnect() {
866        let config = RealtimeConfig::default();
867        let mut manager = RealtimeHardwareManager::new(config);
868
869        let conn_id = manager
870            .connect(HardwareProvider::IBMQuantum, "ibm_qasm_simulator")
871            .unwrap();
872        assert!(!conn_id.is_empty());
873
874        let connections = manager.get_connections().unwrap();
875        assert_eq!(connections.len(), 1);
876
877        manager.disconnect(&conn_id).unwrap();
878        let connections = manager.get_connections().unwrap();
879        assert_eq!(connections.len(), 0);
880    }
881
882    #[test]
883    fn test_job_monitoring() {
884        let config = RealtimeConfig::default();
885        let mut manager = RealtimeHardwareManager::new(config);
886
887        let conn_id = manager
888            .connect(HardwareProvider::IBMQuantum, "ibm_qasm_simulator")
889            .unwrap();
890
891        manager.submit_job("job_123", &conn_id).unwrap();
892
893        let status = manager.get_job_status("job_123").unwrap();
894        assert_eq!(status, JobStatus::Queued);
895
896        let progress = manager.get_job_progress("job_123").unwrap();
897        assert_eq!(progress, 0.0);
898    }
899
900    #[test]
901    fn test_job_status_update() {
902        let config = RealtimeConfig::default();
903        let mut manager = RealtimeHardwareManager::new(config);
904
905        let conn_id = manager
906            .connect(HardwareProvider::IBMQuantum, "backend")
907            .unwrap();
908
909        manager.submit_job("job_456", &conn_id).unwrap();
910
911        manager
912            .update_job_status("job_456", JobStatus::Running, 0.5)
913            .unwrap();
914        let status = manager.get_job_status("job_456").unwrap();
915        assert_eq!(status, JobStatus::Running);
916
917        let progress = manager.get_job_progress("job_456").unwrap();
918        assert_eq!(progress, 0.5);
919    }
920
921    #[test]
922    fn test_partial_results() {
923        let config = RealtimeConfig::default();
924        let mut manager = RealtimeHardwareManager::new(config);
925
926        let conn_id = manager
927            .connect(HardwareProvider::GoogleQuantumAI, "backend")
928            .unwrap();
929
930        manager.submit_job("job_789", &conn_id).unwrap();
931
932        let mut counts = HashMap::new();
933        counts.insert("00".to_string(), 450);
934        counts.insert("11".to_string(), 550);
935
936        manager.add_partial_result("job_789", counts).unwrap();
937
938        let results = manager.get_partial_results("job_789").unwrap();
939        assert_eq!(results.len(), 1);
940        assert_eq!(results[0].counts.get("00"), Some(&450));
941    }
942
943    #[test]
944    fn test_calibration_tracking() {
945        let config = RealtimeConfig::default();
946        let mut manager = RealtimeHardwareManager::new(config);
947
948        let mut single_qubit_errors = HashMap::new();
949        single_qubit_errors.insert(0, 0.001);
950        single_qubit_errors.insert(1, 0.002);
951        single_qubit_errors.insert(2, 0.0015);
952
953        let calibration = CalibrationData {
954            backend: "test_backend".to_string(),
955            timestamp: 12345,
956            single_qubit_errors,
957            two_qubit_errors: HashMap::new(),
958            readout_errors: HashMap::new(),
959            t1_times: HashMap::new(),
960            t2_times: HashMap::new(),
961            gate_durations: HashMap::new(),
962            connectivity: vec![(0, 1), (1, 2)],
963        };
964
965        manager
966            .update_calibration("test_backend", calibration)
967            .unwrap();
968
969        let cal = manager.get_calibration("test_backend").unwrap();
970        assert!(cal.is_some());
971        assert_eq!(cal.unwrap().single_qubit_errors.len(), 3);
972    }
973
974    #[test]
975    fn test_optimal_qubits() {
976        let config = RealtimeConfig::default();
977        let mut manager = RealtimeHardwareManager::new(config);
978
979        let mut single_qubit_errors = HashMap::new();
980        single_qubit_errors.insert(0, 0.005);
981        single_qubit_errors.insert(1, 0.001);
982        single_qubit_errors.insert(2, 0.003);
983        single_qubit_errors.insert(3, 0.002);
984
985        let calibration = CalibrationData {
986            backend: "backend".to_string(),
987            timestamp: 12345,
988            single_qubit_errors,
989            two_qubit_errors: HashMap::new(),
990            readout_errors: HashMap::new(),
991            t1_times: HashMap::new(),
992            t2_times: HashMap::new(),
993            gate_durations: HashMap::new(),
994            connectivity: vec![],
995        };
996
997        manager.update_calibration("backend", calibration).unwrap();
998
999        let optimal = manager.get_optimal_qubits("backend", 2).unwrap();
1000        assert_eq!(optimal.len(), 2);
1001        // Should return qubits with lowest error rates (1 and 3)
1002        assert!(optimal.contains(&1));
1003        assert!(optimal.contains(&3));
1004    }
1005
1006    #[test]
1007    fn test_backend_availability() {
1008        let config = RealtimeConfig::default();
1009        let mut manager = RealtimeHardwareManager::new(config);
1010
1011        let conn_id = manager
1012            .connect(HardwareProvider::AmazonBraket, "backend")
1013            .unwrap();
1014
1015        assert!(manager.is_backend_available(&conn_id).unwrap());
1016        assert!(!manager.is_backend_available("nonexistent").unwrap());
1017    }
1018
1019    #[test]
1020    fn test_statistics() {
1021        let config = RealtimeConfig::default();
1022        let mut manager = RealtimeHardwareManager::new(config);
1023
1024        let conn_id = manager.connect(HardwareProvider::IonQ, "backend").unwrap();
1025
1026        manager.submit_job("job_a", &conn_id).unwrap();
1027        manager.submit_job("job_b", &conn_id).unwrap();
1028
1029        manager
1030            .update_job_status("job_a", JobStatus::Completed, 1.0)
1031            .unwrap();
1032        manager
1033            .update_job_status("job_b", JobStatus::Failed, 0.5)
1034            .unwrap();
1035
1036        let stats = manager.get_stats().unwrap();
1037        assert_eq!(stats.jobs_monitored, 2);
1038        assert_eq!(stats.jobs_completed, 1);
1039        assert_eq!(stats.jobs_failed, 1);
1040    }
1041
1042    #[test]
1043    fn test_config_defaults() {
1044        let config = RealtimeConfig::default();
1045
1046        assert_eq!(config.polling_interval_ms, 500);
1047        assert!(config.enable_streaming);
1048        assert_eq!(config.max_event_buffer, 1000);
1049        assert!(config.enable_adaptive_mitigation);
1050        assert_eq!(config.max_concurrent_jobs, 10);
1051    }
1052
1053    #[test]
1054    fn test_multiple_providers() {
1055        let config = RealtimeConfig::default();
1056        let mut manager = RealtimeHardwareManager::new(config);
1057
1058        manager
1059            .connect(HardwareProvider::IBMQuantum, "ibm_backend")
1060            .unwrap();
1061        manager
1062            .connect(HardwareProvider::GoogleQuantumAI, "google_backend")
1063            .unwrap();
1064        manager
1065            .connect(HardwareProvider::AzureQuantum, "azure_backend")
1066            .unwrap();
1067
1068        let connections = manager.get_connections().unwrap();
1069        assert_eq!(connections.len(), 3);
1070    }
1071
1072    #[test]
1073    fn test_job_completion() {
1074        let config = RealtimeConfig::default();
1075        let mut manager = RealtimeHardwareManager::new(config);
1076
1077        let conn_id = manager
1078            .connect(HardwareProvider::Rigetti, "backend")
1079            .unwrap();
1080
1081        manager.submit_job("job_complete", &conn_id).unwrap();
1082
1083        // Simulate job progress
1084        manager
1085            .update_job_status("job_complete", JobStatus::Running, 0.0)
1086            .unwrap();
1087        manager
1088            .update_job_status("job_complete", JobStatus::Running, 0.5)
1089            .unwrap();
1090        manager
1091            .update_job_status("job_complete", JobStatus::Completed, 1.0)
1092            .unwrap();
1093
1094        let status = manager.get_job_status("job_complete").unwrap();
1095        assert_eq!(status, JobStatus::Completed);
1096    }
1097}