quantrs2_anneal/scientific_performance_optimization/
distributed.rs1use std::collections::{HashMap, VecDeque};
7use std::time::Instant;
8
9use super::config::{
10 ClusterConfig, CommunicationProtocol, DistributedComputingConfig, NodeResources,
11};
12
13pub struct DistributedCoordinator {
15 pub config: DistributedComputingConfig,
17 pub cluster_manager: ClusterManager,
19 pub communication_manager: CommunicationManager,
21 pub fault_tolerance_manager: FaultToleranceManager,
23}
24
25impl DistributedCoordinator {
26 #[must_use]
28 pub fn new(config: DistributedComputingConfig) -> Self {
29 Self {
30 config,
31 cluster_manager: ClusterManager::new(),
32 communication_manager: CommunicationManager::new(),
33 fault_tolerance_manager: FaultToleranceManager::new(),
34 }
35 }
36
37 #[must_use]
39 pub fn is_enabled(&self) -> bool {
40 self.config.enable_distributed
41 }
42
43 #[must_use]
45 pub fn cluster_size(&self) -> usize {
46 self.cluster_manager.active_nodes.len()
47 }
48}
49
50#[derive(Debug)]
52pub struct ClusterManager {
53 pub config: ClusterConfig,
55 pub active_nodes: HashMap<String, ClusterNode>,
57 pub node_statistics: HashMap<String, NodeStatistics>,
59}
60
61impl ClusterManager {
62 #[must_use]
64 pub fn new() -> Self {
65 Self {
66 config: ClusterConfig::default(),
67 active_nodes: HashMap::new(),
68 node_statistics: HashMap::new(),
69 }
70 }
71
72 pub fn add_node(&mut self, address: String, resources: NodeResources) {
74 let node = ClusterNode {
75 address: address.clone(),
76 resources,
77 status: NodeStatus::Active,
78 current_workload: NodeWorkload::default(),
79 };
80 self.active_nodes.insert(address.clone(), node);
81 self.node_statistics
82 .insert(address, NodeStatistics::default());
83 }
84
85 pub fn remove_node(&mut self, address: &str) -> Option<ClusterNode> {
87 self.node_statistics.remove(address);
88 self.active_nodes.remove(address)
89 }
90
91 #[must_use]
93 pub fn available_nodes(&self) -> Vec<&ClusterNode> {
94 self.active_nodes
95 .values()
96 .filter(|n| n.status == NodeStatus::Active)
97 .collect()
98 }
99
100 pub fn update_node_status(&mut self, address: &str, status: NodeStatus) {
102 if let Some(node) = self.active_nodes.get_mut(address) {
103 node.status = status;
104 }
105 }
106
107 #[must_use]
109 pub fn get_node(&self, address: &str) -> Option<&ClusterNode> {
110 self.active_nodes.get(address)
111 }
112}
113
114impl Default for ClusterManager {
115 fn default() -> Self {
116 Self::new()
117 }
118}
119
120#[derive(Debug)]
122pub struct ClusterNode {
123 pub address: String,
125 pub resources: NodeResources,
127 pub status: NodeStatus,
129 pub current_workload: NodeWorkload,
131}
132
133impl ClusterNode {
134 #[must_use]
136 pub fn is_available(&self) -> bool {
137 self.status == NodeStatus::Active && self.current_workload.cpu_utilization < 0.9
138 }
139
140 #[must_use]
142 pub fn available_capacity(&self) -> f64 {
143 1.0 - self.current_workload.cpu_utilization
144 }
145}
146
147#[derive(Debug, Clone, PartialEq, Eq)]
149pub enum NodeStatus {
150 Active,
152 Busy,
154 Unavailable,
156 Failed,
158 Maintenance,
160}
161
162#[derive(Debug, Clone)]
164pub struct NodeWorkload {
165 pub active_tasks: Vec<String>,
167 pub cpu_utilization: f64,
169 pub memory_utilization: f64,
171 pub network_utilization: f64,
173}
174
175impl Default for NodeWorkload {
176 fn default() -> Self {
177 Self {
178 active_tasks: Vec::new(),
179 cpu_utilization: 0.0,
180 memory_utilization: 0.0,
181 network_utilization: 0.0,
182 }
183 }
184}
185
186impl NodeWorkload {
187 #[must_use]
189 pub fn overall_load(&self) -> f64 {
190 (self.cpu_utilization + self.memory_utilization + self.network_utilization) / 3.0
191 }
192}
193
194#[derive(Debug)]
196pub struct CommunicationManager {
197 pub protocol: CommunicationProtocol,
199 pub connections: HashMap<String, Connection>,
201 pub message_queues: HashMap<String, VecDeque<Message>>,
203 pub statistics: CommunicationStatistics,
205}
206
207impl CommunicationManager {
208 #[must_use]
210 pub fn new() -> Self {
211 Self {
212 protocol: CommunicationProtocol::TCP,
213 connections: HashMap::new(),
214 message_queues: HashMap::new(),
215 statistics: CommunicationStatistics::default(),
216 }
217 }
218
219 pub fn connect(&mut self, address: &str) -> Result<(), String> {
221 if self.connections.contains_key(address) {
222 return Ok(());
223 }
224
225 let connection = Connection {
226 id: format!("conn_{}", self.connections.len()),
227 remote_address: address.to_string(),
228 status: ConnectionStatus::Active,
229 statistics: ConnectionStatistics::default(),
230 };
231
232 self.connections.insert(address.to_string(), connection);
233 self.message_queues
234 .insert(address.to_string(), VecDeque::new());
235 self.statistics.connections_established += 1;
236
237 Ok(())
238 }
239
240 pub fn disconnect(&mut self, address: &str) {
242 if let Some(mut conn) = self.connections.remove(address) {
243 conn.status = ConnectionStatus::Disconnected;
244 self.statistics.connections_closed += 1;
245 }
246 self.message_queues.remove(address);
247 }
248
249 pub fn send(&mut self, destination: &str, message: Message) -> Result<(), String> {
251 if let Some(queue) = self.message_queues.get_mut(destination) {
252 queue.push_back(message);
253 self.statistics.messages_sent += 1;
254 Ok(())
255 } else {
256 Err(format!("No connection to {destination}"))
257 }
258 }
259
260 pub fn receive(&mut self, source: &str) -> Vec<Message> {
262 let mut messages = Vec::new();
263 if let Some(queue) = self.message_queues.get_mut(source) {
264 while let Some(msg) = queue.pop_front() {
265 messages.push(msg);
266 self.statistics.messages_received += 1;
267 }
268 }
269 messages
270 }
271}
272
273impl Default for CommunicationManager {
274 fn default() -> Self {
275 Self::new()
276 }
277}
278
279#[derive(Debug)]
281pub struct Connection {
282 pub id: String,
284 pub remote_address: String,
286 pub status: ConnectionStatus,
288 pub statistics: ConnectionStatistics,
290}
291
292#[derive(Debug, Clone, PartialEq, Eq)]
294pub enum ConnectionStatus {
295 Active,
297 Connecting,
299 Disconnected,
301 Failed,
303}
304
305#[derive(Debug, Clone)]
307pub struct Message {
308 pub id: String,
310 pub source: String,
312 pub destination: String,
314 pub message_type: MessageType,
316 pub payload: Vec<u8>,
318 pub timestamp: Instant,
320}
321
322impl Message {
323 #[must_use]
325 pub fn new(
326 source: String,
327 destination: String,
328 message_type: MessageType,
329 payload: Vec<u8>,
330 ) -> Self {
331 Self {
332 id: uuid_simple(),
333 source,
334 destination,
335 message_type,
336 payload,
337 timestamp: Instant::now(),
338 }
339 }
340}
341
342#[derive(Debug, Clone, PartialEq, Eq)]
344pub enum MessageType {
345 TaskAssignment,
347 TaskResult,
349 Heartbeat,
351 StatusUpdate,
353 Error,
355 Control,
357}
358
359#[derive(Debug, Clone, Default)]
361pub struct FaultToleranceManager {
362 pub failed_nodes: Vec<String>,
364 pub recovery_attempts: HashMap<String, u32>,
366 pub checkpoints: HashMap<String, Checkpoint>,
368}
369
370impl FaultToleranceManager {
371 #[must_use]
373 pub fn new() -> Self {
374 Self {
375 failed_nodes: Vec::new(),
376 recovery_attempts: HashMap::new(),
377 checkpoints: HashMap::new(),
378 }
379 }
380
381 pub fn record_failure(&mut self, node_address: &str) {
383 if !self.failed_nodes.contains(&node_address.to_string()) {
384 self.failed_nodes.push(node_address.to_string());
385 }
386 *self
387 .recovery_attempts
388 .entry(node_address.to_string())
389 .or_insert(0) += 1;
390 }
391
392 pub fn record_recovery(&mut self, node_address: &str) {
394 self.failed_nodes.retain(|n| n != node_address);
395 }
396
397 pub fn create_checkpoint(&mut self, task_id: &str, data: Vec<u8>) {
399 let checkpoint = Checkpoint {
400 task_id: task_id.to_string(),
401 data,
402 timestamp: Instant::now(),
403 };
404 self.checkpoints.insert(task_id.to_string(), checkpoint);
405 }
406
407 #[must_use]
409 pub fn get_checkpoint(&self, task_id: &str) -> Option<&Checkpoint> {
410 self.checkpoints.get(task_id)
411 }
412}
413
414#[derive(Debug, Clone)]
416pub struct Checkpoint {
417 pub task_id: String,
419 pub data: Vec<u8>,
421 pub timestamp: Instant,
423}
424
425#[derive(Debug, Clone, Default)]
429pub struct NodeStatistics {
430 pub tasks_completed: u64,
432 pub tasks_failed: u64,
434 pub total_execution_time: std::time::Duration,
436 pub avg_task_time: std::time::Duration,
438}
439
440#[derive(Debug, Clone, Default)]
442pub struct ConnectionStatistics {
443 pub bytes_sent: u64,
445 pub bytes_received: u64,
447 pub messages_sent: u64,
449 pub messages_received: u64,
451 pub errors: u64,
453}
454
455#[derive(Debug, Clone, Default)]
457pub struct CommunicationStatistics {
458 pub connections_established: u64,
460 pub connections_closed: u64,
462 pub messages_sent: u64,
464 pub messages_received: u64,
466 pub total_bytes: u64,
468}
469
470fn uuid_simple() -> String {
472 use std::time::SystemTime;
473 let now = SystemTime::now()
474 .duration_since(SystemTime::UNIX_EPOCH)
475 .map(|d| d.as_nanos())
476 .unwrap_or(0);
477 format!("msg_{now:x}")
478}