1use std::collections::{HashMap, HashSet};
6use std::hash::Hash;
7use std::net::SocketAddr;
8use std::sync::{Arc, Mutex, RwLock};
9use std::thread;
10use std::time::{Duration, Instant};
11
12pub type MessageHandlerFn =
14 Box<dyn Fn(&DistributedMessage) -> Result<MessageResponse, DistributedError> + Send + Sync>;
15
16#[derive(Debug, Clone)]
17pub struct ResourceReservation {
18 pub id: String,
19 pub node_id: String,
20 pub start_time: Instant,
21 pub duration: Duration,
22 pub resources: ResourceAllocation,
23}
24#[derive(Debug, Clone, PartialEq)]
25pub enum ConsensusState {
26 Follower,
27 Candidate,
28 Leader,
29}
30#[derive(Debug, Clone)]
32pub struct ClusterConfig {
33 pub heartbeat_interval: Duration,
34 pub node_timeout: Duration,
35 pub job_timeout: Duration,
36 pub max_retries: u32,
37 pub load_threshold: f64,
38 pub replication_factor: u32,
39}
40#[derive(Debug, Clone)]
41pub enum MessagePriority {
42 Low,
43 Normal,
44 High,
45 Critical,
46}
47#[derive(Debug, Clone)]
48pub struct MessageResponse {
49 pub message_id: String,
50 pub success: bool,
51 pub data: Vec<u8>,
52 pub error: Option<String>,
53}
54#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
56pub enum JobPriority {
57 Low,
58 Normal,
59 High,
60 Critical,
61}
62#[derive(Debug, Clone, PartialEq)]
64pub enum JobStatus {
65 Pending,
66 Running,
67 Completed,
68 Failed,
69 Cancelled,
70 Timeout,
71}
72#[derive(Debug, Clone)]
73pub struct SchedulingDecision {
74 pub job_id: String,
75 pub node_id: String,
76 pub estimated_start_time: Instant,
77 pub resource_allocation: ResourceAllocation,
78}
79#[derive(Debug, thiserror::Error)]
81pub enum DistributedError {
82 #[error("Node not found")]
83 NodeNotFound,
84 #[error("Job not found")]
85 JobNotFound,
86 #[error("Insufficient resources")]
87 InsufficientResources,
88 #[error("Node unreachable")]
89 NodeUnreachable,
90 #[error("Job timeout")]
91 JobTimeout,
92 #[error("Scheduling error: {0}")]
93 SchedulingError(String),
94 #[error("Communication error: {0}")]
95 CommunicationError(String),
96}
97#[derive(Debug, Clone)]
98pub struct VoteRequest {
99 pub term: u64,
100 pub candidate_id: String,
101 pub last_log_index: usize,
102 pub last_log_term: u64,
103}
104#[derive(Debug, Clone)]
105pub struct VoteResponse {
106 pub term: u64,
107 pub vote_granted: bool,
108}
109#[allow(dead_code)]
112pub struct MessagePassingSystem {
113 node_id: String,
114 message_handlers: HashMap<String, MessageHandler>,
115 pending_messages: Arc<Mutex<Vec<DistributedMessage>>>,
116 pub(crate) message_queue: Arc<Mutex<Vec<DistributedMessage>>>,
117 pub(crate) routing_table: Arc<RwLock<HashMap<String, SocketAddr>>>,
118}
119impl MessagePassingSystem {
120 pub fn new(node_id: String) -> Self {
122 Self {
123 node_id,
124 message_handlers: HashMap::new(),
125 pending_messages: Arc::new(Mutex::new(Vec::new())),
126 message_queue: Arc::new(Mutex::new(Vec::new())),
127 routing_table: Arc::new(RwLock::new(HashMap::new())),
128 }
129 }
130 pub fn send_message(&self, message: DistributedMessage) -> Result<(), DistributedError> {
132 let routing_table = self.routing_table.read().unwrap();
133 if let Some(_address) = routing_table.get(&message.destination) {
134 self.message_queue.lock().unwrap().push(message);
135 Ok(())
136 } else {
137 Err(DistributedError::NodeUnreachable)
138 }
139 }
140 pub fn broadcast_message(
142 &self,
143 message_type: MessageType,
144 data: Vec<u8>,
145 ) -> Result<(), DistributedError> {
146 let routing_table = self.routing_table.read().unwrap();
147 for (node_id, _address) in routing_table.iter() {
148 if node_id != &self.node_id {
149 let message = DistributedMessage {
150 id: format!("{}_{}", self.node_id, Instant::now().elapsed().as_millis()),
151 source: self.node_id.clone(),
152 destination: node_id.clone(),
153 message_type: message_type.clone(),
154 data: data.clone(),
155 timestamp: Instant::now(),
156 priority: MessagePriority::Normal,
157 };
158 self.send_message(message)?;
159 }
160 }
161 Ok(())
162 }
163 pub fn register_handler(&mut self, message_type: String, handler: MessageHandler) {
165 self.message_handlers.insert(message_type, handler);
166 }
167 pub fn process_messages(&self) -> Result<Vec<MessageResponse>, DistributedError> {
169 let mut responses = Vec::new();
170 let mut queue = self.message_queue.lock().unwrap();
171 for message in queue.drain(..) {
172 if let Some(handler) = self
173 .message_handlers
174 .get(&format!("{}", message.message_type))
175 {
176 let response = handler.handle(&message)?;
177 responses.push(response);
178 }
179 }
180 Ok(responses)
181 }
182}
183#[derive(Debug, Clone)]
184pub struct PeerInfo {
185 pub id: String,
186 pub next_index: usize,
187 pub match_index: usize,
188 pub last_response: Instant,
189}
190#[allow(dead_code)]
192pub struct AdvancedJobScheduler {
193 scheduling_policies: Vec<SchedulingPolicy>,
194 pub(crate) resource_reservations: HashMap<String, ResourceReservation>,
195 job_dependencies: HashMap<String, Vec<String>>,
196 priority_queues: HashMap<JobPriority, Vec<DistributedJob>>,
197 backfill_enabled: bool,
198}
199impl AdvancedJobScheduler {
200 pub fn new() -> Self {
202 Self {
203 scheduling_policies: vec![
204 SchedulingPolicy::FIFO,
205 SchedulingPolicy::ShortestJobFirst,
206 SchedulingPolicy::GangScheduling,
207 ],
208 resource_reservations: HashMap::new(),
209 job_dependencies: HashMap::new(),
210 priority_queues: HashMap::new(),
211 backfill_enabled: true,
212 }
213 }
214 pub fn gang_schedule(
216 &mut self,
217 jobs: &[DistributedJob],
218 nodes: &HashMap<String, ClusterNode>,
219 ) -> Result<Vec<SchedulingDecision>, DistributedError> {
220 let mut decisions = Vec::new();
221 let job_groups = self.group_related_jobs(jobs);
222 for group in job_groups {
223 if let Some(node_assignment) = self.find_gang_assignment(&group, nodes) {
224 for (job, node_id) in group.iter().zip(node_assignment.iter()) {
225 decisions.push(SchedulingDecision {
226 job_id: job.id.clone(),
227 node_id: node_id.clone(),
228 estimated_start_time: Instant::now(),
229 resource_allocation: self
230 .calculate_resource_allocation(job, node_id, nodes),
231 });
232 }
233 }
234 }
235 Ok(decisions)
236 }
237 pub fn backfill_schedule(
239 &mut self,
240 waiting_jobs: &[DistributedJob],
241 nodes: &HashMap<String, ClusterNode>,
242 ) -> Result<Vec<SchedulingDecision>, DistributedError> {
243 let mut decisions = Vec::new();
244 if !self.backfill_enabled {
245 return Ok(decisions);
246 }
247 for job in waiting_jobs {
248 for (node_id, node) in nodes {
249 if self.can_backfill_job(job, node) {
250 decisions.push(SchedulingDecision {
251 job_id: job.id.clone(),
252 node_id: node_id.clone(),
253 estimated_start_time: Instant::now(),
254 resource_allocation: self
255 .calculate_resource_allocation(job, node_id, nodes),
256 });
257 break;
258 }
259 }
260 }
261 Ok(decisions)
262 }
263 pub fn reserve_resources(
265 &mut self,
266 reservation: ResourceReservation,
267 ) -> Result<(), DistributedError> {
268 self.resource_reservations
269 .insert(reservation.id.clone(), reservation);
270 Ok(())
271 }
272 fn group_related_jobs(&self, jobs: &[DistributedJob]) -> Vec<Vec<DistributedJob>> {
273 vec![jobs.to_vec()]
274 }
275 fn find_gang_assignment(
276 &self,
277 job_group: &[DistributedJob],
278 nodes: &HashMap<String, ClusterNode>,
279 ) -> Option<Vec<String>> {
280 if job_group.len() <= nodes.len() {
281 Some(nodes.keys().take(job_group.len()).cloned().collect())
282 } else {
283 None
284 }
285 }
286 fn can_backfill_job(&self, _job: &DistributedJob, _node: &ClusterNode) -> bool {
287 true
288 }
289 fn calculate_resource_allocation(
290 &self,
291 job: &DistributedJob,
292 _node_id: &str,
293 _nodes: &HashMap<String, ClusterNode>,
294 ) -> ResourceAllocation {
295 ResourceAllocation {
296 cpu_cores: job.requirements.min_cpu_cores,
297 memory_gb: job.requirements.min_memory_gb,
298 gpu_count: job.requirements.min_gpu_count,
299 storage_gb: job.requirements.min_storage_gb,
300 network_bandwidth: 100,
301 }
302 }
303}
304#[derive(Debug, Clone)]
305pub struct PartitionMove {
306 pub partition: usize,
307 pub from_node: String,
308 pub to_node: String,
309}
310#[derive(Debug, Clone)]
311pub struct CheckpointStats {
312 pub total_checkpoints: usize,
313 pub total_size_bytes: u64,
314 pub compressed_checkpoints: usize,
315 pub compression_ratio: f64,
316}
317pub struct JobScheduler {
319 scheduling_strategy: SchedulingStrategy,
320}
321impl JobScheduler {
322 pub fn new() -> Self {
324 Self {
325 scheduling_strategy: SchedulingStrategy::LeastLoaded,
326 }
327 }
328 pub fn find_suitable_node(
330 &self,
331 job: &DistributedJob,
332 nodes: &HashMap<String, ClusterNode>,
333 ) -> Option<String> {
334 let suitable_nodes: Vec<_> = nodes
335 .values()
336 .filter(|node| self.can_run_job(node, job))
337 .collect();
338 if suitable_nodes.is_empty() {
339 return None;
340 }
341 match self.scheduling_strategy {
342 SchedulingStrategy::LeastLoaded => suitable_nodes
343 .iter()
344 .min_by(|a, b| {
345 let load_a = a.load_metrics.cpu_usage + a.load_metrics.memory_usage;
346 let load_b = b.load_metrics.cpu_usage + b.load_metrics.memory_usage;
347 load_a
348 .partial_cmp(&load_b)
349 .unwrap_or(std::cmp::Ordering::Equal)
350 })
351 .map(|node| node.id.clone()),
352 SchedulingStrategy::RoundRobin => suitable_nodes.first().map(|node| node.id.clone()),
353 SchedulingStrategy::HighestCapacity => suitable_nodes
354 .iter()
355 .max_by_key(|node| node.capabilities.cpu_cores * node.capabilities.memory_gb)
356 .map(|node| node.id.clone()),
357 }
358 }
359 fn can_run_job(&self, node: &ClusterNode, job: &DistributedJob) -> bool {
361 node.status == NodeStatus::Available
362 && node.capabilities.cpu_cores >= job.requirements.min_cpu_cores
363 && node.capabilities.memory_gb >= job.requirements.min_memory_gb
364 && node.capabilities.gpu_count >= job.requirements.min_gpu_count
365 && node.capabilities.storage_gb >= job.requirements.min_storage_gb
366 }
367}
368pub struct CheckpointManager {
370 pub(crate) checkpoint_storage: HashMap<String, Checkpoint>,
371 #[allow(dead_code)]
372 checkpoint_interval: Duration,
373 compression_enabled: bool,
374}
375impl CheckpointManager {
376 pub fn new(checkpoint_interval: Duration) -> Self {
378 Self {
379 checkpoint_storage: HashMap::new(),
380 checkpoint_interval,
381 compression_enabled: true,
382 }
383 }
384 pub fn create_checkpoint(
386 &mut self,
387 job_id: &str,
388 state: JobState,
389 ) -> Result<String, DistributedError> {
390 let checkpoint_id = format!("{job_id}_{}", Instant::now().elapsed().as_millis());
391 let checkpoint = Checkpoint {
392 id: checkpoint_id.clone(),
393 job_id: job_id.to_string(),
394 state,
395 created_at: Instant::now(),
396 size_bytes: 1024,
397 compressed: self.compression_enabled,
398 };
399 self.checkpoint_storage
400 .insert(checkpoint_id.clone(), checkpoint);
401 Ok(checkpoint_id)
402 }
403 pub fn restore_checkpoint(&self, checkpoint_id: &str) -> Result<JobState, DistributedError> {
405 self.checkpoint_storage
406 .get(checkpoint_id)
407 .map(|checkpoint| checkpoint.state.clone())
408 .ok_or(DistributedError::JobNotFound)
409 }
410 pub fn cleanup_old_checkpoints(&mut self, retention_period: Duration) {
412 let cutoff = Instant::now() - retention_period;
413 self.checkpoint_storage
414 .retain(|_, checkpoint| checkpoint.created_at > cutoff);
415 }
416 pub fn get_checkpoint_stats(&self) -> CheckpointStats {
418 let total_checkpoints = self.checkpoint_storage.len();
419 let total_size: u64 = self.checkpoint_storage.values().map(|c| c.size_bytes).sum();
420 let compressed_checkpoints = self
421 .checkpoint_storage
422 .values()
423 .filter(|c| c.compressed)
424 .count();
425 CheckpointStats {
426 total_checkpoints,
427 total_size_bytes: total_size,
428 compressed_checkpoints,
429 compression_ratio: if total_checkpoints > 0 {
430 compressed_checkpoints as f64 / total_checkpoints as f64
431 } else {
432 0.0
433 },
434 }
435 }
436}
437#[derive(Debug, Clone)]
439pub struct ClusterNode {
440 pub id: String,
441 pub address: SocketAddr,
442 pub capabilities: NodeCapabilities,
443 pub status: NodeStatus,
444 pub last_heartbeat: Instant,
445 pub load_metrics: LoadMetrics,
446 pub job_history: Vec<JobExecution>,
447}
448#[derive(Debug, Clone)]
449pub struct LogEntry {
450 pub term: u64,
451 pub index: usize,
452 pub command: String,
453 pub data: Vec<u8>,
454}
455#[derive(Debug, Clone)]
456pub struct DistributedMessage {
457 pub id: String,
458 pub source: String,
459 pub destination: String,
460 pub message_type: MessageType,
461 pub data: Vec<u8>,
462 pub timestamp: Instant,
463 pub priority: MessagePriority,
464}
465#[derive(Debug, Clone)]
466pub struct JobState {
467 pub progress: f64,
468 pub intermediate_results: HashMap<String, Vec<u8>>,
469 pub runtime_state: Vec<u8>,
470}
471pub struct DataPartitioner {
473 partitioning_strategy: PartitioningStrategy,
474 partition_count: usize,
475 node_assignments: HashMap<usize, String>,
476 replication_factor: usize,
477}
478impl DataPartitioner {
479 pub fn new(
481 strategy: PartitioningStrategy,
482 partition_count: usize,
483 replication_factor: usize,
484 ) -> Self {
485 Self {
486 partitioning_strategy: strategy,
487 partition_count,
488 node_assignments: HashMap::new(),
489 replication_factor,
490 }
491 }
492 pub fn get_partition(&self, key: &str) -> usize {
494 match self.partitioning_strategy {
495 PartitioningStrategy::Hash => {
496 use std::collections::hash_map::DefaultHasher;
497 use std::hash::Hasher;
498 let mut hasher = DefaultHasher::new();
499 key.hash(&mut hasher);
500 (hasher.finish() as usize) % self.partition_count
501 }
502 PartitioningStrategy::Range => key.len() % self.partition_count,
503 PartitioningStrategy::Random => key.len() % self.partition_count,
504 }
505 }
506 pub fn get_partition_nodes(&self, partition: usize) -> Vec<String> {
508 let mut nodes = Vec::new();
509 if let Some(primary_node) = self.node_assignments.get(&partition) {
510 nodes.push(primary_node.clone());
511 for i in 1..self.replication_factor {
512 let replica_partition = (partition + i) % self.partition_count;
513 if let Some(replica_node) = self.node_assignments.get(&replica_partition) {
514 if !nodes.contains(replica_node) {
515 nodes.push(replica_node.clone());
516 }
517 }
518 }
519 }
520 nodes
521 }
522 pub fn assign_partition(&mut self, partition: usize, node_id: String) {
524 self.node_assignments.insert(partition, node_id);
525 }
526 pub fn rebalance_partitions(&mut self, available_nodes: &[String]) -> PartitioningResult {
528 let mut assignments_changed = 0;
529 let mut partitions_moved = Vec::new();
530 for partition in 0..self.partition_count {
531 let optimal_node = &available_nodes[partition % available_nodes.len()];
532 if let Some(current_node) = self.node_assignments.get(&partition) {
533 if current_node != optimal_node {
534 partitions_moved.push(PartitionMove {
535 partition,
536 from_node: current_node.clone(),
537 to_node: optimal_node.clone(),
538 });
539 self.node_assignments
540 .insert(partition, optimal_node.clone());
541 assignments_changed += 1;
542 }
543 } else {
544 self.node_assignments
545 .insert(partition, optimal_node.clone());
546 assignments_changed += 1;
547 }
548 }
549 PartitioningResult {
550 assignments_changed,
551 partitions_moved,
552 rebalance_time: Instant::now(),
553 }
554 }
555}
556#[derive(Debug, Clone)]
558pub struct ClusterStats {
559 pub total_nodes: usize,
560 pub available_nodes: usize,
561 pub busy_nodes: usize,
562 pub overloaded_nodes: usize,
563 pub unreachable_nodes: usize,
564 pub total_jobs: usize,
565 pub running_jobs: usize,
566 pub completed_jobs: usize,
567 pub failed_jobs: usize,
568 pub queued_jobs: usize,
569 pub total_cpu_cores: u32,
570 pub total_memory_gb: u32,
571 pub total_gpu_count: u32,
572 pub avg_cpu_usage: f64,
573 pub avg_memory_usage: f64,
574}
575pub struct MessageHandler {
576 pub handler_fn: MessageHandlerFn,
577}
578impl MessageHandler {
579 pub fn new<F>(f: F) -> Self
580 where
581 F: Fn(&DistributedMessage) -> Result<MessageResponse, DistributedError>
582 + Send
583 + Sync
584 + 'static,
585 {
586 Self {
587 handler_fn: Box::new(f),
588 }
589 }
590 pub fn handle(
591 &self,
592 message: &DistributedMessage,
593 ) -> Result<MessageResponse, DistributedError> {
594 (self.handler_fn)(message)
595 }
596}
597pub struct FaultDetector {
599 failure_history: HashMap<String, Vec<Instant>>,
600}
601impl FaultDetector {
602 pub fn new() -> Self {
604 Self {
605 failure_history: HashMap::new(),
606 }
607 }
608 pub fn handle_failure(&mut self, node_id: &str) -> Result<(), DistributedError> {
610 let failures = self.failure_history.entry(node_id.to_string()).or_default();
611 failures.push(Instant::now());
612 let cutoff = Instant::now() - Duration::from_secs(3600);
613 failures.retain(|&failure_time| failure_time > cutoff);
614 if failures.len() > 3 {
615 println!("Node {node_id} has too many failures, marking as problematic");
616 }
617 Ok(())
618 }
619 pub fn is_problematic(&self, node_id: &str) -> bool {
621 self.failure_history
622 .get(node_id)
623 .map(|failures| failures.len() > 3)
624 .unwrap_or(false)
625 }
626}
627#[derive(Debug, Clone)]
628pub enum MessageType {
629 JobSubmission,
630 JobResult,
631 Heartbeat,
632 ResourceUpdate,
633 ConsensusRequest,
634 DataPartition,
635 Custom(String),
636}
637pub struct LoadBalancer {
639 rebalance_threshold: f64,
640}
641impl LoadBalancer {
642 pub fn new() -> Self {
644 Self {
645 rebalance_threshold: 0.8,
646 }
647 }
648 pub fn rebalance(&self, nodes: &HashMap<String, ClusterNode>) -> Result<(), DistributedError> {
650 let overloaded_nodes: Vec<_> = nodes
651 .values()
652 .filter(|node| {
653 let total_load = node.load_metrics.cpu_usage + node.load_metrics.memory_usage;
654 total_load > self.rebalance_threshold * 2.0
655 })
656 .collect();
657 let underloaded_nodes: Vec<_> = nodes
658 .values()
659 .filter(|node| {
660 let total_load = node.load_metrics.cpu_usage + node.load_metrics.memory_usage;
661 total_load < self.rebalance_threshold
662 })
663 .collect();
664 println!(
665 "Rebalancing: {} overloaded nodes, {} underloaded nodes",
666 overloaded_nodes.len(),
667 underloaded_nodes.len()
668 );
669 Ok(())
670 }
671}
672#[derive(Debug, Clone, PartialEq)]
674pub enum NodeStatus {
675 Available,
676 Busy,
677 Overloaded,
678 Unreachable,
679 Maintenance,
680}
681#[derive(Debug, Clone, PartialEq)]
683pub enum JobType {
684 Training,
685 Inference,
686 DataProcessing,
687 ModelEvaluation,
688 Hyperparameter,
689 Custom(String),
690}
691#[derive(Debug, Clone)]
692pub struct Checkpoint {
693 pub id: String,
694 pub job_id: String,
695 pub state: JobState,
696 pub created_at: Instant,
697 pub size_bytes: u64,
698 pub compressed: bool,
699}
700#[derive(Debug, Clone)]
702pub struct LoadMetrics {
703 pub cpu_usage: f64,
704 pub memory_usage: f64,
705 pub gpu_usage: f64,
706 pub network_io: f64,
707 pub disk_io: f64,
708 pub active_jobs: u32,
709 pub queue_size: u32,
710}
711#[derive(Debug, Clone)]
713pub struct DistributedJob {
714 pub id: String,
715 pub name: String,
716 pub job_type: JobType,
717 pub priority: JobPriority,
718 pub requirements: ResourceRequirements,
719 pub created_at: Instant,
720 pub timeout: Duration,
721 pub retry_count: u32,
722 pub dependencies: Vec<String>,
723 pub metadata: HashMap<String, String>,
724}
725#[derive(Debug, Clone)]
727pub enum SchedulingStrategy {
728 LeastLoaded,
729 RoundRobin,
730 HighestCapacity,
731}
732#[derive(Debug, Clone)]
734pub struct ResourceUsage {
735 pub cpu_time: Duration,
736 pub memory_peak: u64,
737 pub gpu_time: Duration,
738 pub network_bytes: u64,
739 pub disk_bytes: u64,
740}
741#[derive(Debug, Clone)]
743pub struct NodeCapabilities {
744 pub cpu_cores: u32,
745 pub memory_gb: u32,
746 pub gpu_count: u32,
747 pub storage_gb: u32,
748 pub network_bandwidth_mbps: u32,
749 pub supported_tasks: HashSet<String>,
750}
751#[allow(dead_code)]
753pub struct ConsensusManager {
754 node_id: String,
755 pub(crate) state: ConsensusState,
756 term: u64,
757 voted_for: Option<String>,
758 pub(crate) log: Vec<LogEntry>,
759 commit_index: usize,
760 last_applied: usize,
761 peers: HashMap<String, PeerInfo>,
762}
763impl ConsensusManager {
764 pub fn new(node_id: String, peers: Vec<String>) -> Self {
766 let mut peer_map = HashMap::new();
767 for peer in peers {
768 peer_map.insert(
769 peer.clone(),
770 PeerInfo {
771 id: peer,
772 next_index: 0,
773 match_index: 0,
774 last_response: Instant::now(),
775 },
776 );
777 }
778 Self {
779 node_id,
780 state: ConsensusState::Follower,
781 term: 0,
782 voted_for: None,
783 log: Vec::new(),
784 commit_index: 0,
785 last_applied: 0,
786 peers: peer_map,
787 }
788 }
789 pub fn start_election(&mut self) -> Result<(), DistributedError> {
791 self.state = ConsensusState::Candidate;
792 self.term += 1;
793 self.voted_for = Some(self.node_id.clone());
794 println!(
795 "Node {} starting election for term {}",
796 self.node_id, self.term
797 );
798 Ok(())
799 }
800 pub fn handle_vote_request(&mut self, request: VoteRequest) -> VoteResponse {
802 let grant_vote = if request.term > self.term {
803 self.term = request.term;
804 self.voted_for = None;
805 self.state = ConsensusState::Follower;
806 true
807 } else if request.term == self.term
808 && (self.voted_for.is_none() || self.voted_for.as_ref() == Some(&request.candidate_id))
809 {
810 self.voted_for = Some(request.candidate_id.clone());
811 true
812 } else {
813 false
814 };
815 VoteResponse {
816 term: self.term,
817 vote_granted: grant_vote,
818 }
819 }
820 pub fn append_entry(&mut self, entry: LogEntry) -> Result<(), DistributedError> {
822 if self.state != ConsensusState::Leader {
823 return Err(DistributedError::SchedulingError("Not leader".to_string()));
824 }
825 self.log.push(entry);
826 Ok(())
827 }
828 pub fn get_state(&self) -> (ConsensusState, u64) {
830 (self.state.clone(), self.term)
831 }
832}
833#[derive(Debug, Clone)]
834pub struct PartitioningResult {
835 pub assignments_changed: usize,
836 pub partitions_moved: Vec<PartitionMove>,
837 pub rebalance_time: Instant,
838}
839#[derive(Debug, Clone)]
840pub enum PartitioningStrategy {
841 Hash,
842 Range,
843 Random,
844}
845#[derive(Debug, Clone)]
846pub enum SchedulingPolicy {
847 FIFO,
848 ShortestJobFirst,
849 GangScheduling,
850 Backfill,
851 PriorityBased,
852}
853pub struct DistributedCluster {
855 nodes: Arc<RwLock<HashMap<String, ClusterNode>>>,
856 jobs: Arc<RwLock<HashMap<String, DistributedJob>>>,
857 executions: Arc<RwLock<HashMap<String, JobExecution>>>,
858 pub(crate) job_queue: Arc<Mutex<Vec<DistributedJob>>>,
859 scheduler: Arc<Mutex<JobScheduler>>,
860 load_balancer: Arc<Mutex<LoadBalancer>>,
861 fault_detector: Arc<Mutex<FaultDetector>>,
862 config: ClusterConfig,
863}
864impl DistributedCluster {
865 pub fn new(config: ClusterConfig) -> Self {
867 Self {
868 nodes: Arc::new(RwLock::new(HashMap::new())),
869 jobs: Arc::new(RwLock::new(HashMap::new())),
870 executions: Arc::new(RwLock::new(HashMap::new())),
871 job_queue: Arc::new(Mutex::new(Vec::new())),
872 scheduler: Arc::new(Mutex::new(JobScheduler::new())),
873 load_balancer: Arc::new(Mutex::new(LoadBalancer::new())),
874 fault_detector: Arc::new(Mutex::new(FaultDetector::new())),
875 config,
876 }
877 }
878 pub fn register_node(&self, node: ClusterNode) -> Result<(), DistributedError> {
880 let mut nodes = self.nodes.write().unwrap();
881 nodes.insert(node.id.clone(), node);
882 Ok(())
883 }
884 pub fn remove_node(&self, node_id: &str) -> Result<(), DistributedError> {
886 let mut nodes = self.nodes.write().unwrap();
887 nodes
888 .remove(node_id)
889 .ok_or(DistributedError::NodeNotFound)?;
890 Ok(())
891 }
892 pub fn get_nodes(&self) -> Vec<ClusterNode> {
894 self.nodes.read().unwrap().values().cloned().collect()
895 }
896 pub fn get_available_nodes(&self) -> Vec<ClusterNode> {
898 self.nodes
899 .read()
900 .unwrap()
901 .values()
902 .filter(|node| node.status == NodeStatus::Available)
903 .cloned()
904 .collect()
905 }
906 pub fn submit_job(&self, job: DistributedJob) -> Result<String, DistributedError> {
908 let job_id = job.id.clone();
909 self.jobs
910 .write()
911 .unwrap()
912 .insert(job_id.clone(), job.clone());
913 self.job_queue.lock().unwrap().push(job);
914 self.schedule_jobs()?;
915 Ok(job_id)
916 }
917 pub fn schedule_jobs(&self) -> Result<(), DistributedError> {
919 let scheduler = self.scheduler.lock().unwrap();
920 let mut queue = self.job_queue.lock().unwrap();
921 let nodes = self.nodes.read().unwrap();
922 queue.sort_by(|a, b| {
923 b.priority
924 .cmp(&a.priority)
925 .then_with(|| a.created_at.cmp(&b.created_at))
926 });
927 let mut scheduled_jobs = Vec::new();
928 for job in queue.iter() {
929 if let Some(node_id) = scheduler.find_suitable_node(job, &nodes) {
930 let execution = JobExecution {
931 job_id: job.id.clone(),
932 node_id: node_id.clone(),
933 start_time: Instant::now(),
934 end_time: None,
935 status: JobStatus::Running,
936 progress: 0.0,
937 result: None,
938 error: None,
939 resource_usage: None,
940 };
941 self.executions
942 .write()
943 .unwrap()
944 .insert(job.id.clone(), execution);
945 scheduled_jobs.push(job.id.clone());
946 }
947 }
948 queue.retain(|job| !scheduled_jobs.contains(&job.id));
949 Ok(())
950 }
951 pub fn get_job_status(&self, job_id: &str) -> Option<JobStatus> {
953 self.executions
954 .read()
955 .unwrap()
956 .get(job_id)
957 .map(|exec| exec.status.clone())
958 }
959 pub fn get_job_execution(&self, job_id: &str) -> Option<JobExecution> {
961 self.executions.read().unwrap().get(job_id).cloned()
962 }
963 pub fn cancel_job(&self, job_id: &str) -> Result<(), DistributedError> {
965 let mut executions = self.executions.write().unwrap();
966 if let Some(execution) = executions.get_mut(job_id) {
967 execution.status = JobStatus::Cancelled;
968 execution.end_time = Some(Instant::now());
969 Ok(())
970 } else {
971 Err(DistributedError::JobNotFound)
972 }
973 }
974 pub fn update_heartbeat(
976 &self,
977 node_id: &str,
978 load_metrics: LoadMetrics,
979 ) -> Result<(), DistributedError> {
980 let mut nodes = self.nodes.write().unwrap();
981 if let Some(node) = nodes.get_mut(node_id) {
982 node.last_heartbeat = Instant::now();
983 node.load_metrics = load_metrics;
984 node.status = self.determine_node_status(&node.load_metrics);
985 Ok(())
986 } else {
987 Err(DistributedError::NodeNotFound)
988 }
989 }
990 fn determine_node_status(&self, metrics: &LoadMetrics) -> NodeStatus {
992 if metrics.cpu_usage > 0.9 || metrics.memory_usage > 0.9 {
993 NodeStatus::Overloaded
994 } else if metrics.cpu_usage > 0.7 || metrics.memory_usage > 0.7 {
995 NodeStatus::Busy
996 } else {
997 NodeStatus::Available
998 }
999 }
1000 pub fn get_cluster_stats(&self) -> ClusterStats {
1002 let nodes = self.nodes.read().unwrap();
1003 let jobs = self.jobs.read().unwrap();
1004 let executions = self.executions.read().unwrap();
1005 let total_nodes = nodes.len();
1006 let available_nodes = nodes
1007 .values()
1008 .filter(|n| n.status == NodeStatus::Available)
1009 .count();
1010 let busy_nodes = nodes
1011 .values()
1012 .filter(|n| n.status == NodeStatus::Busy)
1013 .count();
1014 let overloaded_nodes = nodes
1015 .values()
1016 .filter(|n| n.status == NodeStatus::Overloaded)
1017 .count();
1018 let total_jobs = jobs.len();
1019 let running_jobs = executions
1020 .values()
1021 .filter(|e| e.status == JobStatus::Running)
1022 .count();
1023 let completed_jobs = executions
1024 .values()
1025 .filter(|e| e.status == JobStatus::Completed)
1026 .count();
1027 let failed_jobs = executions
1028 .values()
1029 .filter(|e| e.status == JobStatus::Failed)
1030 .count();
1031 let total_cpu_cores: u32 = nodes.values().map(|n| n.capabilities.cpu_cores).sum();
1032 let total_memory_gb: u32 = nodes.values().map(|n| n.capabilities.memory_gb).sum();
1033 let total_gpu_count: u32 = nodes.values().map(|n| n.capabilities.gpu_count).sum();
1034 let avg_cpu_usage = if !nodes.is_empty() {
1035 nodes
1036 .values()
1037 .map(|n| n.load_metrics.cpu_usage)
1038 .sum::<f64>()
1039 / nodes.len() as f64
1040 } else {
1041 0.0
1042 };
1043 let avg_memory_usage = if !nodes.is_empty() {
1044 nodes
1045 .values()
1046 .map(|n| n.load_metrics.memory_usage)
1047 .sum::<f64>()
1048 / nodes.len() as f64
1049 } else {
1050 0.0
1051 };
1052 ClusterStats {
1053 total_nodes,
1054 available_nodes,
1055 busy_nodes,
1056 overloaded_nodes,
1057 unreachable_nodes: total_nodes - available_nodes - busy_nodes - overloaded_nodes,
1058 total_jobs,
1059 running_jobs,
1060 completed_jobs,
1061 failed_jobs,
1062 queued_jobs: self.job_queue.lock().unwrap().len(),
1063 total_cpu_cores,
1064 total_memory_gb,
1065 total_gpu_count,
1066 avg_cpu_usage,
1067 avg_memory_usage,
1068 }
1069 }
1070 pub fn start_monitoring(&self) -> Result<(), DistributedError> {
1072 let nodes = Arc::clone(&self.nodes);
1073 let executions = Arc::clone(&self.executions);
1074 let config = self.config.clone();
1075 thread::spawn(move || loop {
1076 let now = Instant::now();
1077 let mut nodes_guard = nodes.write().unwrap();
1078 for node in nodes_guard.values_mut() {
1079 if now.duration_since(node.last_heartbeat) > config.node_timeout {
1080 node.status = NodeStatus::Unreachable;
1081 }
1082 }
1083 let mut executions_guard = executions.write().unwrap();
1084 for execution in executions_guard.values_mut() {
1085 if execution.status == JobStatus::Running
1086 && now.duration_since(execution.start_time) > config.job_timeout
1087 {
1088 execution.status = JobStatus::Timeout;
1089 execution.end_time = Some(now);
1090 }
1091 }
1092 drop(nodes_guard);
1093 drop(executions_guard);
1094 thread::sleep(config.heartbeat_interval);
1095 });
1096 Ok(())
1097 }
1098 pub fn rebalance_workload(&self) -> Result<(), DistributedError> {
1100 let load_balancer = self.load_balancer.lock().unwrap();
1101 let nodes = self.nodes.read().unwrap();
1102 load_balancer.rebalance(&nodes)?;
1103 Ok(())
1104 }
1105 pub fn handle_node_failure(&self, node_id: &str) -> Result<(), DistributedError> {
1107 let mut fault_detector = self.fault_detector.lock().unwrap();
1108 let mut executions = self.executions.write().unwrap();
1109 for execution in executions.values_mut() {
1110 if execution.node_id == node_id && execution.status == JobStatus::Running {
1111 execution.status = JobStatus::Failed;
1112 execution.end_time = Some(Instant::now());
1113 execution.error = Some("Node failure".to_string());
1114 }
1115 }
1116 fault_detector.handle_failure(node_id)?;
1117 Ok(())
1118 }
1119}
1120#[derive(Debug, Clone)]
1121pub struct ResourceAllocation {
1122 pub cpu_cores: u32,
1123 pub memory_gb: u32,
1124 pub gpu_count: u32,
1125 pub storage_gb: u32,
1126 pub network_bandwidth: u32,
1127}
1128#[derive(Debug, Clone)]
1130pub struct JobExecution {
1131 pub job_id: String,
1132 pub node_id: String,
1133 pub start_time: Instant,
1134 pub end_time: Option<Instant>,
1135 pub status: JobStatus,
1136 pub progress: f64,
1137 pub result: Option<String>,
1138 pub error: Option<String>,
1139 pub resource_usage: Option<ResourceUsage>,
1140}
1141#[derive(Debug, Clone)]
1143pub struct ResourceRequirements {
1144 pub min_cpu_cores: u32,
1145 pub min_memory_gb: u32,
1146 pub min_gpu_count: u32,
1147 pub min_storage_gb: u32,
1148 pub preferred_node_tags: HashSet<String>,
1149 pub exclusive_access: bool,
1150}