quantrs2_anneal/scientific_performance_optimization/
distributed.rs

1//! Distributed computing types for scientific performance optimization.
2//!
3//! This module contains cluster management, communication,
4//! fault tolerance, and distributed coordination.
5
6use std::collections::{HashMap, VecDeque};
7use std::time::Instant;
8
9use super::config::{
10    ClusterConfig, CommunicationProtocol, DistributedComputingConfig, NodeResources,
11};
12
13/// Distributed coordinator for cluster computing
14pub struct DistributedCoordinator {
15    /// Configuration
16    pub config: DistributedComputingConfig,
17    /// Cluster manager
18    pub cluster_manager: ClusterManager,
19    /// Communication manager
20    pub communication_manager: CommunicationManager,
21    /// Fault tolerance manager
22    pub fault_tolerance_manager: FaultToleranceManager,
23}
24
25impl DistributedCoordinator {
26    /// Create a new distributed coordinator
27    #[must_use]
28    pub fn new(config: DistributedComputingConfig) -> Self {
29        Self {
30            config,
31            cluster_manager: ClusterManager::new(),
32            communication_manager: CommunicationManager::new(),
33            fault_tolerance_manager: FaultToleranceManager::new(),
34        }
35    }
36
37    /// Check if distributed computing is enabled
38    #[must_use]
39    pub fn is_enabled(&self) -> bool {
40        self.config.enable_distributed
41    }
42
43    /// Get cluster size
44    #[must_use]
45    pub fn cluster_size(&self) -> usize {
46        self.cluster_manager.active_nodes.len()
47    }
48}
49
50/// Cluster manager for node coordination
51#[derive(Debug)]
52pub struct ClusterManager {
53    /// Cluster configuration
54    pub config: ClusterConfig,
55    /// Active nodes
56    pub active_nodes: HashMap<String, ClusterNode>,
57    /// Node statistics
58    pub node_statistics: HashMap<String, NodeStatistics>,
59}
60
61impl ClusterManager {
62    /// Create a new cluster manager
63    #[must_use]
64    pub fn new() -> Self {
65        Self {
66            config: ClusterConfig::default(),
67            active_nodes: HashMap::new(),
68            node_statistics: HashMap::new(),
69        }
70    }
71
72    /// Add a node to the cluster
73    pub fn add_node(&mut self, address: String, resources: NodeResources) {
74        let node = ClusterNode {
75            address: address.clone(),
76            resources,
77            status: NodeStatus::Active,
78            current_workload: NodeWorkload::default(),
79        };
80        self.active_nodes.insert(address.clone(), node);
81        self.node_statistics
82            .insert(address, NodeStatistics::default());
83    }
84
85    /// Remove a node from the cluster
86    pub fn remove_node(&mut self, address: &str) -> Option<ClusterNode> {
87        self.node_statistics.remove(address);
88        self.active_nodes.remove(address)
89    }
90
91    /// Get available nodes
92    #[must_use]
93    pub fn available_nodes(&self) -> Vec<&ClusterNode> {
94        self.active_nodes
95            .values()
96            .filter(|n| n.status == NodeStatus::Active)
97            .collect()
98    }
99
100    /// Update node status
101    pub fn update_node_status(&mut self, address: &str, status: NodeStatus) {
102        if let Some(node) = self.active_nodes.get_mut(address) {
103            node.status = status;
104        }
105    }
106
107    /// Get node by address
108    #[must_use]
109    pub fn get_node(&self, address: &str) -> Option<&ClusterNode> {
110        self.active_nodes.get(address)
111    }
112}
113
114impl Default for ClusterManager {
115    fn default() -> Self {
116        Self::new()
117    }
118}
119
120/// Cluster node representation
121#[derive(Debug)]
122pub struct ClusterNode {
123    /// Node address
124    pub address: String,
125    /// Node resources
126    pub resources: NodeResources,
127    /// Node status
128    pub status: NodeStatus,
129    /// Current workload
130    pub current_workload: NodeWorkload,
131}
132
133impl ClusterNode {
134    /// Check if node is available for work
135    #[must_use]
136    pub fn is_available(&self) -> bool {
137        self.status == NodeStatus::Active && self.current_workload.cpu_utilization < 0.9
138    }
139
140    /// Get available capacity
141    #[must_use]
142    pub fn available_capacity(&self) -> f64 {
143        1.0 - self.current_workload.cpu_utilization
144    }
145}
146
147/// Node status
148#[derive(Debug, Clone, PartialEq, Eq)]
149pub enum NodeStatus {
150    /// Node is active and available
151    Active,
152    /// Node is busy
153    Busy,
154    /// Node is temporarily unavailable
155    Unavailable,
156    /// Node has failed
157    Failed,
158    /// Node is in maintenance
159    Maintenance,
160}
161
162/// Node workload information
163#[derive(Debug, Clone)]
164pub struct NodeWorkload {
165    /// Active tasks
166    pub active_tasks: Vec<String>,
167    /// CPU utilization
168    pub cpu_utilization: f64,
169    /// Memory utilization
170    pub memory_utilization: f64,
171    /// Network utilization
172    pub network_utilization: f64,
173}
174
175impl Default for NodeWorkload {
176    fn default() -> Self {
177        Self {
178            active_tasks: Vec::new(),
179            cpu_utilization: 0.0,
180            memory_utilization: 0.0,
181            network_utilization: 0.0,
182        }
183    }
184}
185
186impl NodeWorkload {
187    /// Calculate overall load
188    #[must_use]
189    pub fn overall_load(&self) -> f64 {
190        (self.cpu_utilization + self.memory_utilization + self.network_utilization) / 3.0
191    }
192}
193
194/// Communication manager for inter-node communication
195#[derive(Debug)]
196pub struct CommunicationManager {
197    /// Communication protocol
198    pub protocol: CommunicationProtocol,
199    /// Active connections
200    pub connections: HashMap<String, Connection>,
201    /// Message queues
202    pub message_queues: HashMap<String, VecDeque<Message>>,
203    /// Communication statistics
204    pub statistics: CommunicationStatistics,
205}
206
207impl CommunicationManager {
208    /// Create a new communication manager
209    #[must_use]
210    pub fn new() -> Self {
211        Self {
212            protocol: CommunicationProtocol::TCP,
213            connections: HashMap::new(),
214            message_queues: HashMap::new(),
215            statistics: CommunicationStatistics::default(),
216        }
217    }
218
219    /// Establish connection to a node
220    pub fn connect(&mut self, address: &str) -> Result<(), String> {
221        if self.connections.contains_key(address) {
222            return Ok(());
223        }
224
225        let connection = Connection {
226            id: format!("conn_{}", self.connections.len()),
227            remote_address: address.to_string(),
228            status: ConnectionStatus::Active,
229            statistics: ConnectionStatistics::default(),
230        };
231
232        self.connections.insert(address.to_string(), connection);
233        self.message_queues
234            .insert(address.to_string(), VecDeque::new());
235        self.statistics.connections_established += 1;
236
237        Ok(())
238    }
239
240    /// Disconnect from a node
241    pub fn disconnect(&mut self, address: &str) {
242        if let Some(mut conn) = self.connections.remove(address) {
243            conn.status = ConnectionStatus::Disconnected;
244            self.statistics.connections_closed += 1;
245        }
246        self.message_queues.remove(address);
247    }
248
249    /// Send a message
250    pub fn send(&mut self, destination: &str, message: Message) -> Result<(), String> {
251        if let Some(queue) = self.message_queues.get_mut(destination) {
252            queue.push_back(message);
253            self.statistics.messages_sent += 1;
254            Ok(())
255        } else {
256            Err(format!("No connection to {destination}"))
257        }
258    }
259
260    /// Receive messages from a node
261    pub fn receive(&mut self, source: &str) -> Vec<Message> {
262        let mut messages = Vec::new();
263        if let Some(queue) = self.message_queues.get_mut(source) {
264            while let Some(msg) = queue.pop_front() {
265                messages.push(msg);
266                self.statistics.messages_received += 1;
267            }
268        }
269        messages
270    }
271}
272
273impl Default for CommunicationManager {
274    fn default() -> Self {
275        Self::new()
276    }
277}
278
279/// Connection representation
280#[derive(Debug)]
281pub struct Connection {
282    /// Connection identifier
283    pub id: String,
284    /// Remote address
285    pub remote_address: String,
286    /// Connection status
287    pub status: ConnectionStatus,
288    /// Connection statistics
289    pub statistics: ConnectionStatistics,
290}
291
292/// Connection status
293#[derive(Debug, Clone, PartialEq, Eq)]
294pub enum ConnectionStatus {
295    /// Connection is active
296    Active,
297    /// Connection is being established
298    Connecting,
299    /// Connection is temporarily disconnected
300    Disconnected,
301    /// Connection has failed
302    Failed,
303}
304
305/// Message for inter-node communication
306#[derive(Debug, Clone)]
307pub struct Message {
308    /// Message identifier
309    pub id: String,
310    /// Source node
311    pub source: String,
312    /// Destination node
313    pub destination: String,
314    /// Message type
315    pub message_type: MessageType,
316    /// Message payload
317    pub payload: Vec<u8>,
318    /// Timestamp
319    pub timestamp: Instant,
320}
321
322impl Message {
323    /// Create a new message
324    #[must_use]
325    pub fn new(
326        source: String,
327        destination: String,
328        message_type: MessageType,
329        payload: Vec<u8>,
330    ) -> Self {
331        Self {
332            id: uuid_simple(),
333            source,
334            destination,
335            message_type,
336            payload,
337            timestamp: Instant::now(),
338        }
339    }
340}
341
342/// Message types
343#[derive(Debug, Clone, PartialEq, Eq)]
344pub enum MessageType {
345    /// Task assignment
346    TaskAssignment,
347    /// Task result
348    TaskResult,
349    /// Heartbeat
350    Heartbeat,
351    /// Status update
352    StatusUpdate,
353    /// Error notification
354    Error,
355    /// Control message
356    Control,
357}
358
359/// Fault tolerance manager
360#[derive(Debug, Clone, Default)]
361pub struct FaultToleranceManager {
362    /// Failed nodes
363    pub failed_nodes: Vec<String>,
364    /// Recovery attempts
365    pub recovery_attempts: HashMap<String, u32>,
366    /// Checkpoints
367    pub checkpoints: HashMap<String, Checkpoint>,
368}
369
370impl FaultToleranceManager {
371    /// Create a new fault tolerance manager
372    #[must_use]
373    pub fn new() -> Self {
374        Self {
375            failed_nodes: Vec::new(),
376            recovery_attempts: HashMap::new(),
377            checkpoints: HashMap::new(),
378        }
379    }
380
381    /// Record node failure
382    pub fn record_failure(&mut self, node_address: &str) {
383        if !self.failed_nodes.contains(&node_address.to_string()) {
384            self.failed_nodes.push(node_address.to_string());
385        }
386        *self
387            .recovery_attempts
388            .entry(node_address.to_string())
389            .or_insert(0) += 1;
390    }
391
392    /// Record node recovery
393    pub fn record_recovery(&mut self, node_address: &str) {
394        self.failed_nodes.retain(|n| n != node_address);
395    }
396
397    /// Create checkpoint
398    pub fn create_checkpoint(&mut self, task_id: &str, data: Vec<u8>) {
399        let checkpoint = Checkpoint {
400            task_id: task_id.to_string(),
401            data,
402            timestamp: Instant::now(),
403        };
404        self.checkpoints.insert(task_id.to_string(), checkpoint);
405    }
406
407    /// Get checkpoint
408    #[must_use]
409    pub fn get_checkpoint(&self, task_id: &str) -> Option<&Checkpoint> {
410        self.checkpoints.get(task_id)
411    }
412}
413
414/// Checkpoint for fault recovery
415#[derive(Debug, Clone)]
416pub struct Checkpoint {
417    /// Task identifier
418    pub task_id: String,
419    /// Checkpoint data
420    pub data: Vec<u8>,
421    /// Timestamp
422    pub timestamp: Instant,
423}
424
425// Statistics types
426
427/// Node statistics
428#[derive(Debug, Clone, Default)]
429pub struct NodeStatistics {
430    /// Tasks completed
431    pub tasks_completed: u64,
432    /// Tasks failed
433    pub tasks_failed: u64,
434    /// Total execution time
435    pub total_execution_time: std::time::Duration,
436    /// Average task time
437    pub avg_task_time: std::time::Duration,
438}
439
440/// Connection statistics
441#[derive(Debug, Clone, Default)]
442pub struct ConnectionStatistics {
443    /// Bytes sent
444    pub bytes_sent: u64,
445    /// Bytes received
446    pub bytes_received: u64,
447    /// Messages sent
448    pub messages_sent: u64,
449    /// Messages received
450    pub messages_received: u64,
451    /// Errors
452    pub errors: u64,
453}
454
455/// Communication statistics
456#[derive(Debug, Clone, Default)]
457pub struct CommunicationStatistics {
458    /// Connections established
459    pub connections_established: u64,
460    /// Connections closed
461    pub connections_closed: u64,
462    /// Messages sent
463    pub messages_sent: u64,
464    /// Messages received
465    pub messages_received: u64,
466    /// Total bytes transferred
467    pub total_bytes: u64,
468}
469
470/// Generate a simple UUID-like string
471fn uuid_simple() -> String {
472    use std::time::SystemTime;
473    let now = SystemTime::now()
474        .duration_since(SystemTime::UNIX_EPOCH)
475        .map(|d| d.as_nanos())
476        .unwrap_or(0);
477    format!("msg_{now:x}")
478}