sklears_utils/distributed_computing/
functions.rs

1//! Auto-generated module
2//!
3//! 🤖 Generated with [SplitRS](https://github.com/cool-japan/splitrs)
4
5use super::types::*;
6impl Default for AdvancedJobScheduler {
7    fn default() -> Self {
8        Self::new()
9    }
10}
11#[allow(non_snake_case)]
12#[cfg(test)]
13mod tests {
14    use super::*;
15    use std::collections::{HashMap, HashSet};
16    use std::net::{IpAddr, Ipv4Addr, SocketAddr};
17    use std::time::{Duration, Instant};
18    fn create_test_node(id: &str) -> ClusterNode {
19        ClusterNode {
20            id: id.to_string(),
21            address: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080),
22            capabilities: NodeCapabilities {
23                cpu_cores: 8,
24                memory_gb: 16,
25                gpu_count: 1,
26                storage_gb: 1000,
27                network_bandwidth_mbps: 1000,
28                supported_tasks: HashSet::from(["training".to_string(), "inference".to_string()]),
29            },
30            status: NodeStatus::Available,
31            last_heartbeat: Instant::now(),
32            load_metrics: LoadMetrics {
33                cpu_usage: 0.3,
34                memory_usage: 0.4,
35                gpu_usage: 0.2,
36                network_io: 0.1,
37                disk_io: 0.1,
38                active_jobs: 1,
39                queue_size: 0,
40            },
41            job_history: Vec::new(),
42        }
43    }
44    fn create_test_job(id: &str) -> DistributedJob {
45        DistributedJob {
46            id: id.to_string(),
47            name: format!("test_job_{id}"),
48            job_type: JobType::Training,
49            priority: JobPriority::Normal,
50            requirements: ResourceRequirements {
51                min_cpu_cores: 2,
52                min_memory_gb: 4,
53                min_gpu_count: 0,
54                min_storage_gb: 10,
55                preferred_node_tags: HashSet::new(),
56                exclusive_access: false,
57            },
58            created_at: Instant::now(),
59            timeout: Duration::from_secs(3600),
60            retry_count: 0,
61            dependencies: Vec::new(),
62            metadata: HashMap::new(),
63        }
64    }
65    #[test]
66    fn test_cluster_creation() {
67        let cluster = DistributedCluster::new(ClusterConfig::default());
68        assert!(cluster.get_nodes().is_empty());
69    }
70    #[test]
71    fn test_node_registration() {
72        let cluster = DistributedCluster::new(ClusterConfig::default());
73        let node = create_test_node("node1");
74        assert!(cluster.register_node(node.clone()).is_ok());
75        assert_eq!(cluster.get_nodes().len(), 1);
76        assert_eq!(cluster.get_nodes()[0].id, "node1");
77    }
78    #[test]
79    fn test_job_submission() {
80        let cluster = DistributedCluster::new(ClusterConfig::default());
81        let node = create_test_node("node1");
82        let job = create_test_job("job1");
83        cluster.register_node(node).unwrap();
84        let job_id = cluster.submit_job(job).unwrap();
85        assert_eq!(job_id, "job1");
86        assert!(cluster.get_job_status(&job_id).is_some());
87    }
88    #[test]
89    fn test_job_scheduling() {
90        let cluster = DistributedCluster::new(ClusterConfig::default());
91        let node = create_test_node("node1");
92        let job = create_test_job("job1");
93        cluster.register_node(node).unwrap();
94        cluster.submit_job(job).unwrap();
95        let status = cluster.get_job_status("job1");
96        assert!(status.is_some());
97    }
98    #[test]
99    fn test_job_cancellation() {
100        let cluster = DistributedCluster::new(ClusterConfig::default());
101        let node = create_test_node("node1");
102        let job = create_test_job("job1");
103        cluster.register_node(node).unwrap();
104        cluster.submit_job(job).unwrap();
105        assert!(cluster.cancel_job("job1").is_ok());
106        let execution = cluster.get_job_execution("job1");
107        assert!(execution.is_some());
108        assert_eq!(execution.unwrap().status, JobStatus::Cancelled);
109    }
110    #[test]
111    fn test_node_heartbeat() {
112        let cluster = DistributedCluster::new(ClusterConfig::default());
113        let node = create_test_node("node1");
114        cluster.register_node(node).unwrap();
115        let new_metrics = LoadMetrics {
116            cpu_usage: 0.8,
117            memory_usage: 0.7,
118            gpu_usage: 0.5,
119            network_io: 0.3,
120            disk_io: 0.2,
121            active_jobs: 2,
122            queue_size: 1,
123        };
124        assert!(cluster.update_heartbeat("node1", new_metrics).is_ok());
125        let nodes = cluster.get_nodes();
126        assert_eq!(nodes[0].load_metrics.cpu_usage, 0.8);
127        assert_eq!(nodes[0].status, NodeStatus::Busy);
128    }
129    #[test]
130    fn test_cluster_stats() {
131        let cluster = DistributedCluster::new(ClusterConfig::default());
132        let node1 = create_test_node("node1");
133        let node2 = create_test_node("node2");
134        cluster.register_node(node1).unwrap();
135        cluster.register_node(node2).unwrap();
136        let stats = cluster.get_cluster_stats();
137        assert_eq!(stats.total_nodes, 2);
138        assert_eq!(stats.available_nodes, 2);
139        assert_eq!(stats.total_cpu_cores, 16);
140        assert_eq!(stats.total_memory_gb, 32);
141    }
142    #[test]
143    fn test_job_scheduler() {
144        let scheduler = JobScheduler::new();
145        let mut nodes = HashMap::new();
146        let node1 = create_test_node("node1");
147        let node2 = create_test_node("node2");
148        nodes.insert("node1".to_string(), node1);
149        nodes.insert("node2".to_string(), node2);
150        let job = create_test_job("job1");
151        let selected_node = scheduler.find_suitable_node(&job, &nodes);
152        assert!(selected_node.is_some());
153        assert!(["node1", "node2"].contains(&selected_node.unwrap().as_str()));
154    }
155    #[test]
156    fn test_load_balancer() {
157        let load_balancer = LoadBalancer::new();
158        let mut nodes = HashMap::new();
159        let node1 = create_test_node("node1");
160        nodes.insert("node1".to_string(), node1);
161        assert!(load_balancer.rebalance(&nodes).is_ok());
162    }
163    #[test]
164    fn test_fault_detector() {
165        let mut fault_detector = FaultDetector::new();
166        assert!(fault_detector.handle_failure("node1").is_ok());
167        assert!(!fault_detector.is_problematic("node1"));
168        for _ in 0..4 {
169            fault_detector.handle_failure("node1").unwrap();
170        }
171        assert!(fault_detector.is_problematic("node1"));
172    }
173    #[test]
174    fn test_node_failure_handling() {
175        let cluster = DistributedCluster::new(ClusterConfig::default());
176        let node = create_test_node("node1");
177        let job = create_test_job("job1");
178        cluster.register_node(node).unwrap();
179        cluster.submit_job(job).unwrap();
180        assert!(cluster.handle_node_failure("node1").is_ok());
181        let execution = cluster.get_job_execution("job1");
182        if let Some(exec) = execution {
183            println!("Job status: {:?}", exec.status);
184        }
185    }
186    #[test]
187    fn test_resource_requirements() {
188        let scheduler = JobScheduler::new();
189        let mut nodes = HashMap::new();
190        let node = create_test_node("node1");
191        nodes.insert("node1".to_string(), node);
192        let mut job = create_test_job("job1");
193        job.requirements.min_cpu_cores = 16;
194        let selected_node = scheduler.find_suitable_node(&job, &nodes);
195        assert!(selected_node.is_none());
196        job.requirements.min_cpu_cores = 4;
197        let selected_node = scheduler.find_suitable_node(&job, &nodes);
198        assert!(selected_node.is_some());
199    }
200    #[test]
201    fn test_job_priorities() {
202        let cluster = DistributedCluster::new(ClusterConfig::default());
203        let node = create_test_node("node1");
204        cluster.register_node(node).unwrap();
205        let mut job1 = create_test_job("job1");
206        job1.priority = JobPriority::Low;
207        let mut job2 = create_test_job("job2");
208        job2.priority = JobPriority::High;
209        cluster.submit_job(job1).unwrap();
210        cluster.submit_job(job2).unwrap();
211        let queue = cluster.job_queue.lock().unwrap();
212        if !queue.is_empty() {
213            assert_eq!(queue[0].priority, JobPriority::High);
214        }
215    }
216    #[test]
217    fn test_message_passing_system() {
218        let mps = MessagePassingSystem::new("node1".to_string());
219        let message = DistributedMessage {
220            id: "msg1".to_string(),
221            source: "node1".to_string(),
222            destination: "node2".to_string(),
223            message_type: MessageType::JobSubmission,
224            data: vec![1, 2, 3, 4],
225            timestamp: Instant::now(),
226            priority: MessagePriority::Normal,
227        };
228        assert!(mps.send_message(message.clone()).is_err());
229        mps.routing_table.write().unwrap().insert(
230            "node2".to_string(),
231            SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8081),
232        );
233        assert!(mps.send_message(message).is_ok());
234    }
235    #[test]
236    fn test_message_broadcasting() {
237        let mps = MessagePassingSystem::new("node1".to_string());
238        mps.routing_table.write().unwrap().insert(
239            "node2".to_string(),
240            SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8081),
241        );
242        mps.routing_table.write().unwrap().insert(
243            "node3".to_string(),
244            SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8082),
245        );
246        let data = vec![5, 6, 7, 8];
247        assert!(mps.broadcast_message(MessageType::Heartbeat, data).is_ok());
248        let queue = mps.message_queue.lock().unwrap();
249        assert_eq!(queue.len(), 2);
250    }
251    #[test]
252    fn test_message_handler() {
253        let handler = MessageHandler::new(|msg: &DistributedMessage| {
254            Ok(MessageResponse {
255                message_id: msg.id.clone(),
256                success: true,
257                data: vec![],
258                error: None,
259            })
260        });
261        let message = DistributedMessage {
262            id: "test_msg".to_string(),
263            source: "node1".to_string(),
264            destination: "node2".to_string(),
265            message_type: MessageType::JobSubmission,
266            data: vec![],
267            timestamp: Instant::now(),
268            priority: MessagePriority::Normal,
269        };
270        let response = handler.handle(&message).unwrap();
271        assert!(response.success);
272        assert_eq!(response.message_id, "test_msg");
273    }
274    #[test]
275    fn test_consensus_manager() {
276        let mut consensus = ConsensusManager::new(
277            "node1".to_string(),
278            vec!["node2".to_string(), "node3".to_string()],
279        );
280        let (state, term) = consensus.get_state();
281        assert_eq!(state, ConsensusState::Follower);
282        assert_eq!(term, 0);
283        assert!(consensus.start_election().is_ok());
284        let (state, term) = consensus.get_state();
285        assert_eq!(state, ConsensusState::Candidate);
286        assert_eq!(term, 1);
287        let vote_request = VoteRequest {
288            term: 2,
289            candidate_id: "node2".to_string(),
290            last_log_index: 0,
291            last_log_term: 0,
292        };
293        let response = consensus.handle_vote_request(vote_request);
294        assert!(response.vote_granted);
295        assert_eq!(response.term, 2);
296    }
297    #[test]
298    fn test_consensus_log_entry() {
299        let mut consensus = ConsensusManager::new("node1".to_string(), vec!["node2".to_string()]);
300        let entry = LogEntry {
301            term: 1,
302            index: 0,
303            command: "test_command".to_string(),
304            data: vec![1, 2, 3],
305        };
306        assert!(consensus.append_entry(entry.clone()).is_err());
307        consensus.state = ConsensusState::Leader;
308        assert!(consensus.append_entry(entry).is_ok());
309        assert_eq!(consensus.log.len(), 1);
310    }
311    #[test]
312    fn test_data_partitioner() {
313        let mut partitioner = DataPartitioner::new(PartitioningStrategy::Hash, 4, 2);
314        let partition1 = partitioner.get_partition("key1");
315        let partition2 = partitioner.get_partition("key2");
316        assert!(partition1 < 4);
317        assert!(partition2 < 4);
318        assert_eq!(partition1, partitioner.get_partition("key1"));
319        partitioner.assign_partition(0, "node1".to_string());
320        partitioner.assign_partition(1, "node2".to_string());
321        let nodes = partitioner.get_partition_nodes(0);
322        assert!(nodes.contains(&"node1".to_string()));
323    }
324    #[test]
325    fn test_data_partitioner_rebalancing() {
326        let mut partitioner = DataPartitioner::new(PartitioningStrategy::Hash, 4, 1);
327        let nodes = vec!["node1".to_string(), "node2".to_string()];
328        let result = partitioner.rebalance_partitions(&nodes);
329        assert_eq!(result.assignments_changed, 4);
330        assert_eq!(result.partitions_moved.len(), 0);
331    }
332    #[test]
333    fn test_partitioning_strategies() {
334        let hash_partitioner = DataPartitioner::new(PartitioningStrategy::Hash, 4, 1);
335        let range_partitioner = DataPartitioner::new(PartitioningStrategy::Range, 4, 1);
336        let random_partitioner = DataPartitioner::new(PartitioningStrategy::Random, 4, 1);
337        let key = "test_key";
338        let hash_partition = hash_partitioner.get_partition(key);
339        let range_partition = range_partitioner.get_partition(key);
340        let random_partition = random_partitioner.get_partition(key);
341        assert!(hash_partition < 4);
342        assert!(range_partition < 4);
343        assert!(random_partition < 4);
344    }
345    #[test]
346    fn test_advanced_job_scheduler() {
347        let mut scheduler = AdvancedJobScheduler::new();
348        let mut nodes = HashMap::new();
349        let node1 = create_test_node("node1");
350        let node2 = create_test_node("node2");
351        nodes.insert("node1".to_string(), node1);
352        nodes.insert("node2".to_string(), node2);
353        let jobs = vec![create_test_job("job1"), create_test_job("job2")];
354        let decisions = scheduler.gang_schedule(&jobs, &nodes).unwrap();
355        assert_eq!(decisions.len(), jobs.len());
356        for decision in &decisions {
357            assert!(nodes.contains_key(&decision.node_id));
358            assert!(decision.resource_allocation.cpu_cores > 0);
359        }
360    }
361    #[test]
362    fn test_backfill_scheduling() {
363        let mut scheduler = AdvancedJobScheduler::new();
364        let mut nodes = HashMap::new();
365        let node1 = create_test_node("node1");
366        nodes.insert("node1".to_string(), node1);
367        let waiting_jobs = vec![create_test_job("waiting_job")];
368        let decisions = scheduler.backfill_schedule(&waiting_jobs, &nodes).unwrap();
369        assert_eq!(decisions.len(), 1);
370        assert_eq!(decisions[0].job_id, "waiting_job");
371    }
372    #[test]
373    fn test_resource_reservation() {
374        let mut scheduler = AdvancedJobScheduler::new();
375        let reservation = ResourceReservation {
376            id: "reservation1".to_string(),
377            node_id: "node1".to_string(),
378            start_time: Instant::now(),
379            duration: Duration::from_secs(3600),
380            resources: ResourceAllocation {
381                cpu_cores: 4,
382                memory_gb: 8,
383                gpu_count: 1,
384                storage_gb: 100,
385                network_bandwidth: 1000,
386            },
387        };
388        assert!(scheduler.reserve_resources(reservation).is_ok());
389        assert_eq!(scheduler.resource_reservations.len(), 1);
390    }
391    #[test]
392    fn test_checkpoint_manager() {
393        let mut checkpoint_mgr = CheckpointManager::new(Duration::from_secs(300));
394        let job_state = JobState {
395            progress: 0.5,
396            intermediate_results: HashMap::new(),
397            runtime_state: vec![1, 2, 3, 4],
398        };
399        let checkpoint_id = checkpoint_mgr
400            .create_checkpoint("job1", job_state.clone())
401            .unwrap();
402        assert!(!checkpoint_id.is_empty());
403        let restored_state = checkpoint_mgr.restore_checkpoint(&checkpoint_id).unwrap();
404        assert_eq!(restored_state.progress, 0.5);
405        assert_eq!(restored_state.runtime_state, vec![1, 2, 3, 4]);
406        let stats = checkpoint_mgr.get_checkpoint_stats();
407        assert_eq!(stats.total_checkpoints, 1);
408        assert!(stats.total_size_bytes > 0);
409    }
410    #[test]
411    fn test_checkpoint_cleanup() {
412        let mut checkpoint_mgr = CheckpointManager::new(Duration::from_secs(300));
413        let job_state = JobState {
414            progress: 1.0,
415            intermediate_results: HashMap::new(),
416            runtime_state: vec![],
417        };
418        checkpoint_mgr
419            .create_checkpoint("job1", job_state.clone())
420            .unwrap();
421        checkpoint_mgr.create_checkpoint("job2", job_state).unwrap();
422        assert_eq!(checkpoint_mgr.checkpoint_storage.len(), 2);
423        checkpoint_mgr.cleanup_old_checkpoints(Duration::from_secs(0));
424        assert_eq!(checkpoint_mgr.checkpoint_storage.len(), 0);
425    }
426    #[test]
427    fn test_message_type_conversion() {
428        assert_eq!(format!("{}", MessageType::JobSubmission), "job_submission");
429        assert_eq!(format!("{}", MessageType::JobResult), "job_result");
430        assert_eq!(format!("{}", MessageType::Heartbeat), "heartbeat");
431        assert_eq!(
432            format!("{}", MessageType::ResourceUpdate),
433            "resource_update"
434        );
435        assert_eq!(
436            format!("{}", MessageType::ConsensusRequest),
437            "consensus_request"
438        );
439        assert_eq!(format!("{}", MessageType::DataPartition), "data_partition");
440        assert_eq!(
441            format!("{}", MessageType::Custom("test".to_string())),
442            "test"
443        );
444    }
445    #[test]
446    fn test_consensus_states() {
447        let follower = ConsensusState::Follower;
448        let candidate = ConsensusState::Candidate;
449        let leader = ConsensusState::Leader;
450        assert_eq!(follower, ConsensusState::Follower);
451        assert_eq!(candidate, ConsensusState::Candidate);
452        assert_eq!(leader, ConsensusState::Leader);
453    }
454    #[test]
455    fn test_scheduling_policies() {
456        let policies = [
457            SchedulingPolicy::FIFO,
458            SchedulingPolicy::ShortestJobFirst,
459            SchedulingPolicy::GangScheduling,
460            SchedulingPolicy::Backfill,
461            SchedulingPolicy::PriorityBased,
462        ];
463        assert_eq!(policies.len(), 5);
464    }
465    #[test]
466    fn test_message_priorities() {
467        let low = MessagePriority::Low;
468        let normal = MessagePriority::Normal;
469        let high = MessagePriority::High;
470        let critical = MessagePriority::Critical;
471        match low {
472            MessagePriority::Low => {}
473            _ => panic!(),
474        }
475        match normal {
476            MessagePriority::Normal => {}
477            _ => panic!(),
478        }
479        match high {
480            MessagePriority::High => {}
481            _ => panic!(),
482        }
483        match critical {
484            MessagePriority::Critical => {}
485            _ => panic!(),
486        }
487    }
488    #[test]
489    fn test_resource_allocation_calculations() {
490        let allocation = ResourceAllocation {
491            cpu_cores: 8,
492            memory_gb: 16,
493            gpu_count: 2,
494            storage_gb: 500,
495            network_bandwidth: 1000,
496        };
497        assert_eq!(allocation.cpu_cores, 8);
498        assert_eq!(allocation.memory_gb, 16);
499        assert_eq!(allocation.gpu_count, 2);
500        assert_eq!(allocation.storage_gb, 500);
501        assert_eq!(allocation.network_bandwidth, 1000);
502    }
503    #[test]
504    fn test_job_state_serialization() {
505        let mut intermediate_results = HashMap::new();
506        intermediate_results.insert("result1".to_string(), vec![1, 2, 3]);
507        intermediate_results.insert("result2".to_string(), vec![4, 5, 6]);
508        let job_state = JobState {
509            progress: 0.75,
510            intermediate_results,
511            runtime_state: vec![7, 8, 9],
512        };
513        assert_eq!(job_state.progress, 0.75);
514        assert_eq!(job_state.intermediate_results.len(), 2);
515        assert_eq!(job_state.runtime_state, vec![7, 8, 9]);
516        assert!(job_state.intermediate_results.contains_key("result1"));
517        assert!(job_state.intermediate_results.contains_key("result2"));
518    }
519}