quantrs2_sim/
realtime_hardware_integration.rs

1//! Real-Time Hardware Integration for Cloud Quantum Computers
2//!
3//! This module provides real-time integration capabilities with cloud quantum hardware,
4//! including live job monitoring, streaming results, dynamic calibration tracking,
5//! and real-time error rate monitoring. It enables responsive quantum-classical
6//! hybrid algorithms with immediate feedback from quantum hardware.
7//!
8//! # Features
9//! - Real-time job status monitoring with callbacks
10//! - Streaming measurement results for iterative algorithms
11//! - Dynamic hardware calibration tracking
12//! - Live error rate monitoring and adaptation
13//! - WebSocket-based event streaming (simulated)
14//! - Circuit execution progress tracking
15//! - Hardware availability notifications
16
17use quantrs2_core::error::{QuantRS2Error, QuantRS2Result};
18use scirs2_core::ndarray::Array1;
19use scirs2_core::Complex64;
20use serde::{Deserialize, Serialize};
21use std::collections::{HashMap, VecDeque};
22use std::sync::{Arc, Mutex, RwLock};
23use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
24
25/// Real-time hardware integration manager
26#[derive(Debug)]
27pub struct RealtimeHardwareManager {
28    /// Active hardware connections
29    connections: Arc<RwLock<HashMap<String, HardwareConnection>>>,
30    /// Job monitor for tracking execution
31    job_monitor: JobMonitor,
32    /// Calibration tracker
33    calibration_tracker: CalibrationTracker,
34    /// Event stream for real-time updates
35    event_stream: EventStream,
36    /// Configuration
37    config: RealtimeConfig,
38    /// Statistics
39    stats: Arc<Mutex<RealtimeStats>>,
40}
41
42/// Configuration for real-time hardware integration
43#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct RealtimeConfig {
45    /// Polling interval for job status (milliseconds)
46    pub polling_interval_ms: u64,
47    /// Enable streaming results
48    pub enable_streaming: bool,
49    /// Maximum event buffer size
50    pub max_event_buffer: usize,
51    /// Calibration update interval (seconds)
52    pub calibration_update_interval: u64,
53    /// Enable adaptive error mitigation
54    pub enable_adaptive_mitigation: bool,
55    /// Timeout for real-time operations (seconds)
56    pub operation_timeout: u64,
57    /// Enable hardware availability notifications
58    pub enable_availability_notifications: bool,
59    /// Maximum concurrent jobs
60    pub max_concurrent_jobs: usize,
61}
62
63impl Default for RealtimeConfig {
64    fn default() -> Self {
65        Self {
66            polling_interval_ms: 500,
67            enable_streaming: true,
68            max_event_buffer: 1000,
69            calibration_update_interval: 300,
70            enable_adaptive_mitigation: true,
71            operation_timeout: 3600,
72            enable_availability_notifications: true,
73            max_concurrent_jobs: 10,
74        }
75    }
76}
77
78/// Hardware connection state
79#[derive(Debug, Clone)]
80pub struct HardwareConnection {
81    /// Connection ID
82    pub id: String,
83    /// Hardware provider
84    pub provider: HardwareProvider,
85    /// Connection status
86    pub status: ConnectionStatus,
87    /// Backend name
88    pub backend: String,
89    /// Connection timestamp
90    pub connected_at: u64,
91    /// Last heartbeat
92    pub last_heartbeat: u64,
93    /// Current calibration data
94    pub calibration: Option<CalibrationData>,
95}
96
97/// Hardware provider types
98#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
99pub enum HardwareProvider {
100    IBMQuantum,
101    GoogleQuantumAI,
102    AmazonBraket,
103    AzureQuantum,
104    IonQ,
105    Rigetti,
106    Xanadu,
107    Pasqal,
108}
109
110/// Connection status
111#[derive(Debug, Clone, Copy, PartialEq, Eq)]
112pub enum ConnectionStatus {
113    Connected,
114    Connecting,
115    Disconnected,
116    Error,
117    Maintenance,
118}
119
120/// Job monitor for tracking execution
121pub struct JobMonitor {
122    /// Active jobs being monitored
123    active_jobs: Arc<RwLock<HashMap<String, JobState>>>,
124    /// Job history
125    job_history: Arc<Mutex<VecDeque<JobRecord>>>,
126    /// Callback handlers for job events
127    callbacks: Arc<Mutex<HashMap<String, Vec<Box<dyn Fn(&JobEvent) + Send + Sync>>>>>,
128}
129
130impl std::fmt::Debug for JobMonitor {
131    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
132        f.debug_struct("JobMonitor")
133            .field("active_jobs", &"<jobs>")
134            .field("job_history", &"<history>")
135            .field("callbacks", &"<callbacks>")
136            .finish()
137    }
138}
139
140/// State of a monitored job
141#[derive(Debug, Clone)]
142pub struct JobState {
143    /// Job ID
144    pub job_id: String,
145    /// Current status
146    pub status: JobStatus,
147    /// Progress (0.0 - 1.0)
148    pub progress: f64,
149    /// Start time
150    pub start_time: Instant,
151    /// Estimated completion time
152    pub estimated_completion: Option<Duration>,
153    /// Partial results (for streaming)
154    pub partial_results: Vec<PartialResult>,
155    /// Error information
156    pub error_info: Option<String>,
157    /// Queue position
158    pub queue_position: Option<usize>,
159}
160
161/// Job status
162#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
163pub enum JobStatus {
164    Queued,
165    Running,
166    Completed,
167    Failed,
168    Cancelled,
169    TimedOut,
170}
171
172/// Partial result for streaming
173#[derive(Debug, Clone)]
174pub struct PartialResult {
175    /// Result index
176    pub index: usize,
177    /// Measurement counts
178    pub counts: HashMap<String, usize>,
179    /// Timestamp
180    pub timestamp: u64,
181}
182
183/// Job event for callbacks
184#[derive(Debug, Clone)]
185pub struct JobEvent {
186    /// Event type
187    pub event_type: JobEventType,
188    /// Job ID
189    pub job_id: String,
190    /// Event data
191    pub data: JobEventData,
192    /// Timestamp
193    pub timestamp: u64,
194}
195
196/// Types of job events
197#[derive(Debug, Clone, Copy, PartialEq, Eq)]
198pub enum JobEventType {
199    StatusChanged,
200    ProgressUpdate,
201    PartialResult,
202    Completed,
203    Failed,
204    QueuePositionChanged,
205}
206
207/// Job event data
208#[derive(Debug, Clone)]
209pub enum JobEventData {
210    Status(JobStatus),
211    Progress(f64),
212    Result(PartialResult),
213    Error(String),
214    QueuePosition(usize),
215    None,
216}
217
218/// Job record for history
219#[derive(Debug, Clone)]
220pub struct JobRecord {
221    /// Job ID
222    pub job_id: String,
223    /// Final status
224    pub status: JobStatus,
225    /// Start time
226    pub start_time: u64,
227    /// End time
228    pub end_time: u64,
229    /// Total shots
230    pub total_shots: usize,
231    /// Backend used
232    pub backend: String,
233}
234
235/// Calibration tracker for hardware
236#[derive(Debug)]
237pub struct CalibrationTracker {
238    /// Current calibration data by backend
239    calibrations: Arc<RwLock<HashMap<String, CalibrationData>>>,
240    /// Calibration history
241    history: Arc<Mutex<HashMap<String, VecDeque<CalibrationSnapshot>>>>,
242    /// Last update times
243    last_updates: Arc<Mutex<HashMap<String, Instant>>>,
244}
245
246/// Hardware calibration data
247#[derive(Debug, Clone, Serialize, Deserialize)]
248pub struct CalibrationData {
249    /// Backend name
250    pub backend: String,
251    /// Timestamp
252    pub timestamp: u64,
253    /// Single-qubit gate errors by qubit
254    pub single_qubit_errors: HashMap<usize, f64>,
255    /// Two-qubit gate errors by qubit pair
256    pub two_qubit_errors: HashMap<(usize, usize), f64>,
257    /// Readout errors by qubit
258    pub readout_errors: HashMap<usize, f64>,
259    /// T1 times by qubit (microseconds)
260    pub t1_times: HashMap<usize, f64>,
261    /// T2 times by qubit (microseconds)
262    pub t2_times: HashMap<usize, f64>,
263    /// Gate durations (nanoseconds)
264    pub gate_durations: HashMap<String, f64>,
265    /// Connectivity graph
266    pub connectivity: Vec<(usize, usize)>,
267}
268
269/// Calibration snapshot for history
270#[derive(Debug, Clone)]
271pub struct CalibrationSnapshot {
272    /// Timestamp
273    pub timestamp: u64,
274    /// Average single-qubit error
275    pub avg_single_qubit_error: f64,
276    /// Average two-qubit error
277    pub avg_two_qubit_error: f64,
278    /// Average readout error
279    pub avg_readout_error: f64,
280}
281
282/// Event stream for real-time updates
283pub struct EventStream {
284    /// Event buffer
285    buffer: Arc<Mutex<VecDeque<HardwareEvent>>>,
286    /// Maximum buffer size
287    max_size: usize,
288    /// Event subscribers
289    subscribers: Arc<Mutex<Vec<Box<dyn Fn(&HardwareEvent) + Send + Sync>>>>,
290}
291
292impl std::fmt::Debug for EventStream {
293    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
294        f.debug_struct("EventStream")
295            .field("buffer", &"<buffer>")
296            .field("max_size", &self.max_size)
297            .field("subscribers", &"<subscribers>")
298            .finish()
299    }
300}
301
302/// Hardware events
303#[derive(Debug, Clone)]
304pub struct HardwareEvent {
305    /// Event type
306    pub event_type: HardwareEventType,
307    /// Source backend
308    pub backend: String,
309    /// Event data
310    pub data: HardwareEventData,
311    /// Timestamp
312    pub timestamp: u64,
313}
314
315/// Types of hardware events
316#[derive(Debug, Clone, Copy, PartialEq, Eq)]
317pub enum HardwareEventType {
318    CalibrationUpdated,
319    AvailabilityChanged,
320    ErrorRateAlert,
321    MaintenanceScheduled,
322    JobQueued,
323    JobStarted,
324    JobCompleted,
325}
326
327/// Hardware event data
328#[derive(Debug, Clone)]
329pub enum HardwareEventData {
330    Calibration(CalibrationData),
331    Availability(bool),
332    ErrorRate(f64),
333    Maintenance { start: u64, end: u64 },
334    JobInfo { job_id: String, shots: usize },
335    None,
336}
337
338/// Statistics for real-time operations
339#[derive(Debug, Clone, Default)]
340pub struct RealtimeStats {
341    /// Total jobs monitored
342    pub jobs_monitored: u64,
343    /// Jobs completed successfully
344    pub jobs_completed: u64,
345    /// Jobs failed
346    pub jobs_failed: u64,
347    /// Total events processed
348    pub events_processed: u64,
349    /// Calibration updates received
350    pub calibration_updates: u64,
351    /// Average job completion time
352    pub avg_completion_time: Duration,
353    /// Current active connections
354    pub active_connections: usize,
355}
356
357impl RealtimeHardwareManager {
358    /// Create a new real-time hardware manager
359    #[must_use]
360    pub fn new(config: RealtimeConfig) -> Self {
361        Self {
362            connections: Arc::new(RwLock::new(HashMap::new())),
363            job_monitor: JobMonitor::new(),
364            calibration_tracker: CalibrationTracker::new(),
365            event_stream: EventStream::new(config.max_event_buffer),
366            config,
367            stats: Arc::new(Mutex::new(RealtimeStats::default())),
368        }
369    }
370
371    /// Connect to hardware backend
372    pub fn connect(&mut self, provider: HardwareProvider, backend: &str) -> QuantRS2Result<String> {
373        let conn_id = format!("{provider:?}_{backend}");
374        let now = SystemTime::now()
375            .duration_since(UNIX_EPOCH)
376            .unwrap_or_default()
377            .as_secs();
378
379        let connection = HardwareConnection {
380            id: conn_id.clone(),
381            provider,
382            status: ConnectionStatus::Connected,
383            backend: backend.to_string(),
384            connected_at: now,
385            last_heartbeat: now,
386            calibration: None,
387        };
388
389        let mut connections = self.connections.write().map_err(|_| {
390            QuantRS2Error::InvalidInput("Failed to acquire connections lock".to_string())
391        })?;
392        connections.insert(conn_id.clone(), connection);
393
394        // Update stats
395        let mut stats = self
396            .stats
397            .lock()
398            .map_err(|_| QuantRS2Error::InvalidInput("Failed to acquire stats lock".to_string()))?;
399        stats.active_connections += 1;
400
401        Ok(conn_id)
402    }
403
404    /// Disconnect from hardware backend
405    pub fn disconnect(&mut self, connection_id: &str) -> QuantRS2Result<()> {
406        let mut connections = self.connections.write().map_err(|_| {
407            QuantRS2Error::InvalidInput("Failed to acquire connections lock".to_string())
408        })?;
409
410        if connections.remove(connection_id).is_some() {
411            let mut stats = self.stats.lock().map_err(|_| {
412                QuantRS2Error::InvalidInput("Failed to acquire stats lock".to_string())
413            })?;
414            if stats.active_connections > 0 {
415                stats.active_connections -= 1;
416            }
417        }
418
419        Ok(())
420    }
421
422    /// Submit job for real-time monitoring
423    pub fn submit_job(&mut self, job_id: &str, connection_id: &str) -> QuantRS2Result<()> {
424        let connections = self.connections.read().map_err(|_| {
425            QuantRS2Error::InvalidInput("Failed to acquire connections lock".to_string())
426        })?;
427
428        if !connections.contains_key(connection_id) {
429            return Err(QuantRS2Error::InvalidInput(format!(
430                "Connection {connection_id} not found"
431            )));
432        }
433
434        // Create job state
435        let job_state = JobState {
436            job_id: job_id.to_string(),
437            status: JobStatus::Queued,
438            progress: 0.0,
439            start_time: Instant::now(),
440            estimated_completion: None,
441            partial_results: Vec::new(),
442            error_info: None,
443            queue_position: Some(1),
444        };
445
446        self.job_monitor.add_job(job_state)?;
447
448        // Update stats
449        let mut stats = self
450            .stats
451            .lock()
452            .map_err(|_| QuantRS2Error::InvalidInput("Failed to acquire stats lock".to_string()))?;
453        stats.jobs_monitored += 1;
454
455        Ok(())
456    }
457
458    /// Get job status
459    pub fn get_job_status(&self, job_id: &str) -> QuantRS2Result<JobStatus> {
460        self.job_monitor.get_status(job_id)
461    }
462
463    /// Get job progress
464    pub fn get_job_progress(&self, job_id: &str) -> QuantRS2Result<f64> {
465        self.job_monitor.get_progress(job_id)
466    }
467
468    /// Update job status (simulates receiving update from hardware)
469    pub fn update_job_status(
470        &mut self,
471        job_id: &str,
472        status: JobStatus,
473        progress: f64,
474    ) -> QuantRS2Result<()> {
475        self.job_monitor.update_status(job_id, status, progress)?;
476
477        // Update stats if completed
478        if status == JobStatus::Completed {
479            let mut stats = self.stats.lock().map_err(|_| {
480                QuantRS2Error::InvalidInput("Failed to acquire stats lock".to_string())
481            })?;
482            stats.jobs_completed += 1;
483        } else if status == JobStatus::Failed {
484            let mut stats = self.stats.lock().map_err(|_| {
485                QuantRS2Error::InvalidInput("Failed to acquire stats lock".to_string())
486            })?;
487            stats.jobs_failed += 1;
488        }
489
490        Ok(())
491    }
492
493    /// Add partial result for streaming
494    pub fn add_partial_result(
495        &mut self,
496        job_id: &str,
497        counts: HashMap<String, usize>,
498    ) -> QuantRS2Result<()> {
499        self.job_monitor.add_partial_result(job_id, counts)
500    }
501
502    /// Get partial results for job
503    pub fn get_partial_results(&self, job_id: &str) -> QuantRS2Result<Vec<PartialResult>> {
504        self.job_monitor.get_partial_results(job_id)
505    }
506
507    /// Update calibration data for backend
508    pub fn update_calibration(
509        &mut self,
510        backend: &str,
511        calibration: CalibrationData,
512    ) -> QuantRS2Result<()> {
513        self.calibration_tracker
514            .update_calibration(backend, calibration)?;
515
516        // Update stats
517        let mut stats = self
518            .stats
519            .lock()
520            .map_err(|_| QuantRS2Error::InvalidInput("Failed to acquire stats lock".to_string()))?;
521        stats.calibration_updates += 1;
522
523        Ok(())
524    }
525
526    /// Get current calibration for backend
527    pub fn get_calibration(&self, backend: &str) -> QuantRS2Result<Option<CalibrationData>> {
528        self.calibration_tracker.get_calibration(backend)
529    }
530
531    /// Get optimal qubits based on current calibration
532    pub fn get_optimal_qubits(
533        &self,
534        backend: &str,
535        num_qubits: usize,
536    ) -> QuantRS2Result<Vec<usize>> {
537        let calibration = self.get_calibration(backend)?;
538
539        match calibration {
540            Some(cal) => {
541                // Sort qubits by error rate (lowest first)
542                let mut qubits: Vec<(usize, f64)> = cal
543                    .single_qubit_errors
544                    .iter()
545                    .map(|(&q, &e)| (q, e))
546                    .collect();
547                qubits.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
548
549                Ok(qubits
550                    .into_iter()
551                    .take(num_qubits)
552                    .map(|(q, _)| q)
553                    .collect())
554            }
555            None => {
556                // No calibration data, return sequential qubits
557                Ok((0..num_qubits).collect())
558            }
559        }
560    }
561
562    /// Get statistics
563    pub fn get_stats(&self) -> QuantRS2Result<RealtimeStats> {
564        let stats = self
565            .stats
566            .lock()
567            .map_err(|_| QuantRS2Error::InvalidInput("Failed to acquire stats lock".to_string()))?;
568        Ok(stats.clone())
569    }
570
571    /// Get active connections
572    pub fn get_connections(&self) -> QuantRS2Result<Vec<HardwareConnection>> {
573        let connections = self.connections.read().map_err(|_| {
574            QuantRS2Error::InvalidInput("Failed to acquire connections lock".to_string())
575        })?;
576        Ok(connections.values().cloned().collect())
577    }
578
579    /// Register job event callback
580    pub fn register_job_callback<F>(&mut self, job_id: &str, callback: F) -> QuantRS2Result<()>
581    where
582        F: Fn(&JobEvent) + Send + Sync + 'static,
583    {
584        self.job_monitor.register_callback(job_id, callback)
585    }
586
587    /// Check if backend is available
588    pub fn is_backend_available(&self, connection_id: &str) -> QuantRS2Result<bool> {
589        let connections = self.connections.read().map_err(|_| {
590            QuantRS2Error::InvalidInput("Failed to acquire connections lock".to_string())
591        })?;
592
593        match connections.get(connection_id) {
594            Some(conn) => Ok(conn.status == ConnectionStatus::Connected),
595            None => Ok(false),
596        }
597    }
598}
599
600impl JobMonitor {
601    /// Create new job monitor
602    fn new() -> Self {
603        Self {
604            active_jobs: Arc::new(RwLock::new(HashMap::new())),
605            job_history: Arc::new(Mutex::new(VecDeque::new())),
606            callbacks: Arc::new(Mutex::new(HashMap::new())),
607        }
608    }
609
610    /// Add job for monitoring
611    fn add_job(&self, job_state: JobState) -> QuantRS2Result<()> {
612        let mut jobs = self
613            .active_jobs
614            .write()
615            .map_err(|_| QuantRS2Error::InvalidInput("Failed to acquire jobs lock".to_string()))?;
616        jobs.insert(job_state.job_id.clone(), job_state);
617        Ok(())
618    }
619
620    /// Get job status
621    fn get_status(&self, job_id: &str) -> QuantRS2Result<JobStatus> {
622        let jobs = self
623            .active_jobs
624            .read()
625            .map_err(|_| QuantRS2Error::InvalidInput("Failed to acquire jobs lock".to_string()))?;
626
627        jobs.get(job_id)
628            .map(|j| j.status)
629            .ok_or_else(|| QuantRS2Error::InvalidInput(format!("Job {job_id} not found")))
630    }
631
632    /// Get job progress
633    fn get_progress(&self, job_id: &str) -> QuantRS2Result<f64> {
634        let jobs = self
635            .active_jobs
636            .read()
637            .map_err(|_| QuantRS2Error::InvalidInput("Failed to acquire jobs lock".to_string()))?;
638
639        jobs.get(job_id)
640            .map(|j| j.progress)
641            .ok_or_else(|| QuantRS2Error::InvalidInput(format!("Job {job_id} not found")))
642    }
643
644    /// Update job status
645    fn update_status(&self, job_id: &str, status: JobStatus, progress: f64) -> QuantRS2Result<()> {
646        let mut jobs = self
647            .active_jobs
648            .write()
649            .map_err(|_| QuantRS2Error::InvalidInput("Failed to acquire jobs lock".to_string()))?;
650
651        if let Some(job) = jobs.get_mut(job_id) {
652            job.status = status;
653            job.progress = progress;
654
655            // Trigger callbacks
656            self.trigger_callback(
657                job_id,
658                JobEventType::StatusChanged,
659                JobEventData::Status(status),
660            )?;
661            self.trigger_callback(
662                job_id,
663                JobEventType::ProgressUpdate,
664                JobEventData::Progress(progress),
665            )?;
666        }
667
668        Ok(())
669    }
670
671    /// Add partial result
672    fn add_partial_result(
673        &self,
674        job_id: &str,
675        counts: HashMap<String, usize>,
676    ) -> QuantRS2Result<()> {
677        let mut jobs = self
678            .active_jobs
679            .write()
680            .map_err(|_| QuantRS2Error::InvalidInput("Failed to acquire jobs lock".to_string()))?;
681
682        if let Some(job) = jobs.get_mut(job_id) {
683            let result = PartialResult {
684                index: job.partial_results.len(),
685                counts,
686                timestamp: SystemTime::now()
687                    .duration_since(UNIX_EPOCH)
688                    .unwrap_or_default()
689                    .as_secs(),
690            };
691            job.partial_results.push(result.clone());
692
693            // Trigger callback
694            drop(jobs);
695            self.trigger_callback(
696                job_id,
697                JobEventType::PartialResult,
698                JobEventData::Result(result),
699            )?;
700        }
701
702        Ok(())
703    }
704
705    /// Get partial results
706    fn get_partial_results(&self, job_id: &str) -> QuantRS2Result<Vec<PartialResult>> {
707        let jobs = self
708            .active_jobs
709            .read()
710            .map_err(|_| QuantRS2Error::InvalidInput("Failed to acquire jobs lock".to_string()))?;
711
712        jobs.get(job_id)
713            .map(|j| j.partial_results.clone())
714            .ok_or_else(|| QuantRS2Error::InvalidInput(format!("Job {job_id} not found")))
715    }
716
717    /// Register callback for job events
718    fn register_callback<F>(&self, job_id: &str, callback: F) -> QuantRS2Result<()>
719    where
720        F: Fn(&JobEvent) + Send + Sync + 'static,
721    {
722        let mut callbacks = self.callbacks.lock().map_err(|_| {
723            QuantRS2Error::InvalidInput("Failed to acquire callbacks lock".to_string())
724        })?;
725
726        callbacks
727            .entry(job_id.to_string())
728            .or_insert_with(Vec::new)
729            .push(Box::new(callback));
730
731        Ok(())
732    }
733
734    /// Trigger callbacks for event
735    fn trigger_callback(
736        &self,
737        job_id: &str,
738        event_type: JobEventType,
739        data: JobEventData,
740    ) -> QuantRS2Result<()> {
741        let callbacks = self.callbacks.lock().map_err(|_| {
742            QuantRS2Error::InvalidInput("Failed to acquire callbacks lock".to_string())
743        })?;
744
745        if let Some(handlers) = callbacks.get(job_id) {
746            let event = JobEvent {
747                event_type,
748                job_id: job_id.to_string(),
749                data,
750                timestamp: SystemTime::now()
751                    .duration_since(UNIX_EPOCH)
752                    .unwrap_or_default()
753                    .as_secs(),
754            };
755
756            for handler in handlers {
757                handler(&event);
758            }
759        }
760
761        Ok(())
762    }
763}
764
765impl CalibrationTracker {
766    /// Create new calibration tracker
767    fn new() -> Self {
768        Self {
769            calibrations: Arc::new(RwLock::new(HashMap::new())),
770            history: Arc::new(Mutex::new(HashMap::new())),
771            last_updates: Arc::new(Mutex::new(HashMap::new())),
772        }
773    }
774
775    /// Update calibration data
776    fn update_calibration(
777        &self,
778        backend: &str,
779        calibration: CalibrationData,
780    ) -> QuantRS2Result<()> {
781        // Calculate snapshot
782        let avg_single = if calibration.single_qubit_errors.is_empty() {
783            0.0
784        } else {
785            calibration.single_qubit_errors.values().sum::<f64>()
786                / calibration.single_qubit_errors.len() as f64
787        };
788
789        let avg_two = if calibration.two_qubit_errors.is_empty() {
790            0.0
791        } else {
792            calibration.two_qubit_errors.values().sum::<f64>()
793                / calibration.two_qubit_errors.len() as f64
794        };
795
796        let avg_readout = if calibration.readout_errors.is_empty() {
797            0.0
798        } else {
799            calibration.readout_errors.values().sum::<f64>()
800                / calibration.readout_errors.len() as f64
801        };
802
803        let snapshot = CalibrationSnapshot {
804            timestamp: calibration.timestamp,
805            avg_single_qubit_error: avg_single,
806            avg_two_qubit_error: avg_two,
807            avg_readout_error: avg_readout,
808        };
809
810        // Store calibration
811        let mut calibrations = self.calibrations.write().map_err(|_| {
812            QuantRS2Error::InvalidInput("Failed to acquire calibrations lock".to_string())
813        })?;
814        calibrations.insert(backend.to_string(), calibration);
815
816        // Store snapshot in history
817        let mut history = self.history.lock().map_err(|_| {
818            QuantRS2Error::InvalidInput("Failed to acquire history lock".to_string())
819        })?;
820        history
821            .entry(backend.to_string())
822            .or_insert_with(VecDeque::new)
823            .push_back(snapshot);
824
825        // Update last update time
826        let mut last_updates = self.last_updates.lock().map_err(|_| {
827            QuantRS2Error::InvalidInput("Failed to acquire last_updates lock".to_string())
828        })?;
829        last_updates.insert(backend.to_string(), Instant::now());
830
831        Ok(())
832    }
833
834    /// Get current calibration
835    fn get_calibration(&self, backend: &str) -> QuantRS2Result<Option<CalibrationData>> {
836        let calibrations = self.calibrations.read().map_err(|_| {
837            QuantRS2Error::InvalidInput("Failed to acquire calibrations lock".to_string())
838        })?;
839        Ok(calibrations.get(backend).cloned())
840    }
841}
842
843impl EventStream {
844    /// Create new event stream
845    fn new(max_size: usize) -> Self {
846        Self {
847            buffer: Arc::new(Mutex::new(VecDeque::new())),
848            max_size,
849            subscribers: Arc::new(Mutex::new(Vec::new())),
850        }
851    }
852}
853
854#[cfg(test)]
855mod tests {
856    use super::*;
857
858    #[test]
859    fn test_realtime_manager_creation() {
860        let config = RealtimeConfig::default();
861        let manager = RealtimeHardwareManager::new(config);
862        assert!(manager.get_stats().is_ok());
863    }
864
865    #[test]
866    fn test_connect_disconnect() {
867        let config = RealtimeConfig::default();
868        let mut manager = RealtimeHardwareManager::new(config);
869
870        let conn_id = manager
871            .connect(HardwareProvider::IBMQuantum, "ibm_qasm_simulator")
872            .expect("Connection should succeed");
873        assert!(!conn_id.is_empty());
874
875        let connections = manager
876            .get_connections()
877            .expect("Get connections should succeed");
878        assert_eq!(connections.len(), 1);
879
880        manager
881            .disconnect(&conn_id)
882            .expect("Disconnect should succeed");
883        let connections = manager
884            .get_connections()
885            .expect("Get connections should succeed");
886        assert_eq!(connections.len(), 0);
887    }
888
889    #[test]
890    fn test_job_monitoring() {
891        let config = RealtimeConfig::default();
892        let mut manager = RealtimeHardwareManager::new(config);
893
894        let conn_id = manager
895            .connect(HardwareProvider::IBMQuantum, "ibm_qasm_simulator")
896            .expect("Connection should succeed");
897
898        manager
899            .submit_job("job_123", &conn_id)
900            .expect("Job submission should succeed");
901
902        let status = manager
903            .get_job_status("job_123")
904            .expect("Get job status should succeed");
905        assert_eq!(status, JobStatus::Queued);
906
907        let progress = manager
908            .get_job_progress("job_123")
909            .expect("Get job progress should succeed");
910        assert_eq!(progress, 0.0);
911    }
912
913    #[test]
914    fn test_job_status_update() {
915        let config = RealtimeConfig::default();
916        let mut manager = RealtimeHardwareManager::new(config);
917
918        let conn_id = manager
919            .connect(HardwareProvider::IBMQuantum, "backend")
920            .expect("Connection should succeed");
921
922        manager
923            .submit_job("job_456", &conn_id)
924            .expect("Job submission should succeed");
925
926        manager
927            .update_job_status("job_456", JobStatus::Running, 0.5)
928            .expect("Status update should succeed");
929        let status = manager
930            .get_job_status("job_456")
931            .expect("Get job status should succeed");
932        assert_eq!(status, JobStatus::Running);
933
934        let progress = manager
935            .get_job_progress("job_456")
936            .expect("Get job progress should succeed");
937        assert_eq!(progress, 0.5);
938    }
939
940    #[test]
941    fn test_partial_results() {
942        let config = RealtimeConfig::default();
943        let mut manager = RealtimeHardwareManager::new(config);
944
945        let conn_id = manager
946            .connect(HardwareProvider::GoogleQuantumAI, "backend")
947            .expect("Connection should succeed");
948
949        manager
950            .submit_job("job_789", &conn_id)
951            .expect("Job submission should succeed");
952
953        let mut counts = HashMap::new();
954        counts.insert("00".to_string(), 450);
955        counts.insert("11".to_string(), 550);
956
957        manager
958            .add_partial_result("job_789", counts)
959            .expect("Add partial result should succeed");
960
961        let results = manager
962            .get_partial_results("job_789")
963            .expect("Get partial results should succeed");
964        assert_eq!(results.len(), 1);
965        assert_eq!(results[0].counts.get("00"), Some(&450));
966    }
967
968    #[test]
969    fn test_calibration_tracking() {
970        let config = RealtimeConfig::default();
971        let mut manager = RealtimeHardwareManager::new(config);
972
973        let mut single_qubit_errors = HashMap::new();
974        single_qubit_errors.insert(0, 0.001);
975        single_qubit_errors.insert(1, 0.002);
976        single_qubit_errors.insert(2, 0.0015);
977
978        let calibration = CalibrationData {
979            backend: "test_backend".to_string(),
980            timestamp: 12_345,
981            single_qubit_errors,
982            two_qubit_errors: HashMap::new(),
983            readout_errors: HashMap::new(),
984            t1_times: HashMap::new(),
985            t2_times: HashMap::new(),
986            gate_durations: HashMap::new(),
987            connectivity: vec![(0, 1), (1, 2)],
988        };
989
990        manager
991            .update_calibration("test_backend", calibration)
992            .expect("Calibration update should succeed");
993
994        let cal = manager
995            .get_calibration("test_backend")
996            .expect("Get calibration should succeed");
997        assert!(cal.is_some());
998        assert_eq!(
999            cal.expect("Calibration data should exist")
1000                .single_qubit_errors
1001                .len(),
1002            3
1003        );
1004    }
1005
1006    #[test]
1007    fn test_optimal_qubits() {
1008        let config = RealtimeConfig::default();
1009        let mut manager = RealtimeHardwareManager::new(config);
1010
1011        let mut single_qubit_errors = HashMap::new();
1012        single_qubit_errors.insert(0, 0.005);
1013        single_qubit_errors.insert(1, 0.001);
1014        single_qubit_errors.insert(2, 0.003);
1015        single_qubit_errors.insert(3, 0.002);
1016
1017        let calibration = CalibrationData {
1018            backend: "backend".to_string(),
1019            timestamp: 12_345,
1020            single_qubit_errors,
1021            two_qubit_errors: HashMap::new(),
1022            readout_errors: HashMap::new(),
1023            t1_times: HashMap::new(),
1024            t2_times: HashMap::new(),
1025            gate_durations: HashMap::new(),
1026            connectivity: vec![],
1027        };
1028
1029        manager
1030            .update_calibration("backend", calibration)
1031            .expect("Calibration update should succeed");
1032
1033        let optimal = manager
1034            .get_optimal_qubits("backend", 2)
1035            .expect("Get optimal qubits should succeed");
1036        assert_eq!(optimal.len(), 2);
1037        // Should return qubits with lowest error rates (1 and 3)
1038        assert!(optimal.contains(&1));
1039        assert!(optimal.contains(&3));
1040    }
1041
1042    #[test]
1043    fn test_backend_availability() {
1044        let config = RealtimeConfig::default();
1045        let mut manager = RealtimeHardwareManager::new(config);
1046
1047        let conn_id = manager
1048            .connect(HardwareProvider::AmazonBraket, "backend")
1049            .expect("Connection should succeed");
1050
1051        assert!(manager
1052            .is_backend_available(&conn_id)
1053            .expect("Backend availability check should succeed"));
1054        assert!(!manager
1055            .is_backend_available("nonexistent")
1056            .expect("Backend availability check should succeed"));
1057    }
1058
1059    #[test]
1060    fn test_statistics() {
1061        let config = RealtimeConfig::default();
1062        let mut manager = RealtimeHardwareManager::new(config);
1063
1064        let conn_id = manager
1065            .connect(HardwareProvider::IonQ, "backend")
1066            .expect("Connection should succeed");
1067
1068        manager
1069            .submit_job("job_a", &conn_id)
1070            .expect("Job submission should succeed");
1071        manager
1072            .submit_job("job_b", &conn_id)
1073            .expect("Job submission should succeed");
1074
1075        manager
1076            .update_job_status("job_a", JobStatus::Completed, 1.0)
1077            .expect("Status update should succeed");
1078        manager
1079            .update_job_status("job_b", JobStatus::Failed, 0.5)
1080            .expect("Status update should succeed");
1081
1082        let stats = manager.get_stats().expect("Get stats should succeed");
1083        assert_eq!(stats.jobs_monitored, 2);
1084        assert_eq!(stats.jobs_completed, 1);
1085        assert_eq!(stats.jobs_failed, 1);
1086    }
1087
1088    #[test]
1089    fn test_config_defaults() {
1090        let config = RealtimeConfig::default();
1091
1092        assert_eq!(config.polling_interval_ms, 500);
1093        assert!(config.enable_streaming);
1094        assert_eq!(config.max_event_buffer, 1000);
1095        assert!(config.enable_adaptive_mitigation);
1096        assert_eq!(config.max_concurrent_jobs, 10);
1097    }
1098
1099    #[test]
1100    fn test_multiple_providers() {
1101        let config = RealtimeConfig::default();
1102        let mut manager = RealtimeHardwareManager::new(config);
1103
1104        manager
1105            .connect(HardwareProvider::IBMQuantum, "ibm_backend")
1106            .expect("IBM connection should succeed");
1107        manager
1108            .connect(HardwareProvider::GoogleQuantumAI, "google_backend")
1109            .expect("Google connection should succeed");
1110        manager
1111            .connect(HardwareProvider::AzureQuantum, "azure_backend")
1112            .expect("Azure connection should succeed");
1113
1114        let connections = manager
1115            .get_connections()
1116            .expect("Get connections should succeed");
1117        assert_eq!(connections.len(), 3);
1118    }
1119
1120    #[test]
1121    fn test_job_completion() {
1122        let config = RealtimeConfig::default();
1123        let mut manager = RealtimeHardwareManager::new(config);
1124
1125        let conn_id = manager
1126            .connect(HardwareProvider::Rigetti, "backend")
1127            .expect("Connection should succeed");
1128
1129        manager
1130            .submit_job("job_complete", &conn_id)
1131            .expect("Job submission should succeed");
1132
1133        // Simulate job progress
1134        manager
1135            .update_job_status("job_complete", JobStatus::Running, 0.0)
1136            .expect("Status update should succeed");
1137        manager
1138            .update_job_status("job_complete", JobStatus::Running, 0.5)
1139            .expect("Status update should succeed");
1140        manager
1141            .update_job_status("job_complete", JobStatus::Completed, 1.0)
1142            .expect("Status update should succeed");
1143
1144        let status = manager
1145            .get_job_status("job_complete")
1146            .expect("Get job status should succeed");
1147        assert_eq!(status, JobStatus::Completed);
1148    }
1149}