1use super::types::*;
6impl Default for AdvancedJobScheduler {
7 fn default() -> Self {
8 Self::new()
9 }
10}
11#[allow(non_snake_case)]
12#[cfg(test)]
13mod tests {
14 use super::*;
15 use std::collections::{HashMap, HashSet};
16 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
17 use std::time::{Duration, Instant};
18 fn create_test_node(id: &str) -> ClusterNode {
19 ClusterNode {
20 id: id.to_string(),
21 address: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080),
22 capabilities: NodeCapabilities {
23 cpu_cores: 8,
24 memory_gb: 16,
25 gpu_count: 1,
26 storage_gb: 1000,
27 network_bandwidth_mbps: 1000,
28 supported_tasks: HashSet::from(["training".to_string(), "inference".to_string()]),
29 },
30 status: NodeStatus::Available,
31 last_heartbeat: Instant::now(),
32 load_metrics: LoadMetrics {
33 cpu_usage: 0.3,
34 memory_usage: 0.4,
35 gpu_usage: 0.2,
36 network_io: 0.1,
37 disk_io: 0.1,
38 active_jobs: 1,
39 queue_size: 0,
40 },
41 job_history: Vec::new(),
42 }
43 }
44 fn create_test_job(id: &str) -> DistributedJob {
45 DistributedJob {
46 id: id.to_string(),
47 name: format!("test_job_{id}"),
48 job_type: JobType::Training,
49 priority: JobPriority::Normal,
50 requirements: ResourceRequirements {
51 min_cpu_cores: 2,
52 min_memory_gb: 4,
53 min_gpu_count: 0,
54 min_storage_gb: 10,
55 preferred_node_tags: HashSet::new(),
56 exclusive_access: false,
57 },
58 created_at: Instant::now(),
59 timeout: Duration::from_secs(3600),
60 retry_count: 0,
61 dependencies: Vec::new(),
62 metadata: HashMap::new(),
63 }
64 }
65 #[test]
66 fn test_cluster_creation() {
67 let cluster = DistributedCluster::new(ClusterConfig::default());
68 assert!(cluster.get_nodes().is_empty());
69 }
70 #[test]
71 fn test_node_registration() {
72 let cluster = DistributedCluster::new(ClusterConfig::default());
73 let node = create_test_node("node1");
74 assert!(cluster.register_node(node.clone()).is_ok());
75 assert_eq!(cluster.get_nodes().len(), 1);
76 assert_eq!(cluster.get_nodes()[0].id, "node1");
77 }
78 #[test]
79 fn test_job_submission() {
80 let cluster = DistributedCluster::new(ClusterConfig::default());
81 let node = create_test_node("node1");
82 let job = create_test_job("job1");
83 cluster.register_node(node).unwrap();
84 let job_id = cluster.submit_job(job).unwrap();
85 assert_eq!(job_id, "job1");
86 assert!(cluster.get_job_status(&job_id).is_some());
87 }
88 #[test]
89 fn test_job_scheduling() {
90 let cluster = DistributedCluster::new(ClusterConfig::default());
91 let node = create_test_node("node1");
92 let job = create_test_job("job1");
93 cluster.register_node(node).unwrap();
94 cluster.submit_job(job).unwrap();
95 let status = cluster.get_job_status("job1");
96 assert!(status.is_some());
97 }
98 #[test]
99 fn test_job_cancellation() {
100 let cluster = DistributedCluster::new(ClusterConfig::default());
101 let node = create_test_node("node1");
102 let job = create_test_job("job1");
103 cluster.register_node(node).unwrap();
104 cluster.submit_job(job).unwrap();
105 assert!(cluster.cancel_job("job1").is_ok());
106 let execution = cluster.get_job_execution("job1");
107 assert!(execution.is_some());
108 assert_eq!(execution.unwrap().status, JobStatus::Cancelled);
109 }
110 #[test]
111 fn test_node_heartbeat() {
112 let cluster = DistributedCluster::new(ClusterConfig::default());
113 let node = create_test_node("node1");
114 cluster.register_node(node).unwrap();
115 let new_metrics = LoadMetrics {
116 cpu_usage: 0.8,
117 memory_usage: 0.7,
118 gpu_usage: 0.5,
119 network_io: 0.3,
120 disk_io: 0.2,
121 active_jobs: 2,
122 queue_size: 1,
123 };
124 assert!(cluster.update_heartbeat("node1", new_metrics).is_ok());
125 let nodes = cluster.get_nodes();
126 assert_eq!(nodes[0].load_metrics.cpu_usage, 0.8);
127 assert_eq!(nodes[0].status, NodeStatus::Busy);
128 }
129 #[test]
130 fn test_cluster_stats() {
131 let cluster = DistributedCluster::new(ClusterConfig::default());
132 let node1 = create_test_node("node1");
133 let node2 = create_test_node("node2");
134 cluster.register_node(node1).unwrap();
135 cluster.register_node(node2).unwrap();
136 let stats = cluster.get_cluster_stats();
137 assert_eq!(stats.total_nodes, 2);
138 assert_eq!(stats.available_nodes, 2);
139 assert_eq!(stats.total_cpu_cores, 16);
140 assert_eq!(stats.total_memory_gb, 32);
141 }
142 #[test]
143 fn test_job_scheduler() {
144 let scheduler = JobScheduler::new();
145 let mut nodes = HashMap::new();
146 let node1 = create_test_node("node1");
147 let node2 = create_test_node("node2");
148 nodes.insert("node1".to_string(), node1);
149 nodes.insert("node2".to_string(), node2);
150 let job = create_test_job("job1");
151 let selected_node = scheduler.find_suitable_node(&job, &nodes);
152 assert!(selected_node.is_some());
153 assert!(["node1", "node2"].contains(&selected_node.unwrap().as_str()));
154 }
155 #[test]
156 fn test_load_balancer() {
157 let load_balancer = LoadBalancer::new();
158 let mut nodes = HashMap::new();
159 let node1 = create_test_node("node1");
160 nodes.insert("node1".to_string(), node1);
161 assert!(load_balancer.rebalance(&nodes).is_ok());
162 }
163 #[test]
164 fn test_fault_detector() {
165 let mut fault_detector = FaultDetector::new();
166 assert!(fault_detector.handle_failure("node1").is_ok());
167 assert!(!fault_detector.is_problematic("node1"));
168 for _ in 0..4 {
169 fault_detector.handle_failure("node1").unwrap();
170 }
171 assert!(fault_detector.is_problematic("node1"));
172 }
173 #[test]
174 fn test_node_failure_handling() {
175 let cluster = DistributedCluster::new(ClusterConfig::default());
176 let node = create_test_node("node1");
177 let job = create_test_job("job1");
178 cluster.register_node(node).unwrap();
179 cluster.submit_job(job).unwrap();
180 assert!(cluster.handle_node_failure("node1").is_ok());
181 let execution = cluster.get_job_execution("job1");
182 if let Some(exec) = execution {
183 println!("Job status: {:?}", exec.status);
184 }
185 }
186 #[test]
187 fn test_resource_requirements() {
188 let scheduler = JobScheduler::new();
189 let mut nodes = HashMap::new();
190 let node = create_test_node("node1");
191 nodes.insert("node1".to_string(), node);
192 let mut job = create_test_job("job1");
193 job.requirements.min_cpu_cores = 16;
194 let selected_node = scheduler.find_suitable_node(&job, &nodes);
195 assert!(selected_node.is_none());
196 job.requirements.min_cpu_cores = 4;
197 let selected_node = scheduler.find_suitable_node(&job, &nodes);
198 assert!(selected_node.is_some());
199 }
200 #[test]
201 fn test_job_priorities() {
202 let cluster = DistributedCluster::new(ClusterConfig::default());
203 let node = create_test_node("node1");
204 cluster.register_node(node).unwrap();
205 let mut job1 = create_test_job("job1");
206 job1.priority = JobPriority::Low;
207 let mut job2 = create_test_job("job2");
208 job2.priority = JobPriority::High;
209 cluster.submit_job(job1).unwrap();
210 cluster.submit_job(job2).unwrap();
211 let queue = cluster.job_queue.lock().unwrap();
212 if !queue.is_empty() {
213 assert_eq!(queue[0].priority, JobPriority::High);
214 }
215 }
216 #[test]
217 fn test_message_passing_system() {
218 let mps = MessagePassingSystem::new("node1".to_string());
219 let message = DistributedMessage {
220 id: "msg1".to_string(),
221 source: "node1".to_string(),
222 destination: "node2".to_string(),
223 message_type: MessageType::JobSubmission,
224 data: vec![1, 2, 3, 4],
225 timestamp: Instant::now(),
226 priority: MessagePriority::Normal,
227 };
228 assert!(mps.send_message(message.clone()).is_err());
229 mps.routing_table.write().unwrap().insert(
230 "node2".to_string(),
231 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8081),
232 );
233 assert!(mps.send_message(message).is_ok());
234 }
235 #[test]
236 fn test_message_broadcasting() {
237 let mps = MessagePassingSystem::new("node1".to_string());
238 mps.routing_table.write().unwrap().insert(
239 "node2".to_string(),
240 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8081),
241 );
242 mps.routing_table.write().unwrap().insert(
243 "node3".to_string(),
244 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8082),
245 );
246 let data = vec![5, 6, 7, 8];
247 assert!(mps.broadcast_message(MessageType::Heartbeat, data).is_ok());
248 let queue = mps.message_queue.lock().unwrap();
249 assert_eq!(queue.len(), 2);
250 }
251 #[test]
252 fn test_message_handler() {
253 let handler = MessageHandler::new(|msg: &DistributedMessage| {
254 Ok(MessageResponse {
255 message_id: msg.id.clone(),
256 success: true,
257 data: vec![],
258 error: None,
259 })
260 });
261 let message = DistributedMessage {
262 id: "test_msg".to_string(),
263 source: "node1".to_string(),
264 destination: "node2".to_string(),
265 message_type: MessageType::JobSubmission,
266 data: vec![],
267 timestamp: Instant::now(),
268 priority: MessagePriority::Normal,
269 };
270 let response = handler.handle(&message).unwrap();
271 assert!(response.success);
272 assert_eq!(response.message_id, "test_msg");
273 }
274 #[test]
275 fn test_consensus_manager() {
276 let mut consensus = ConsensusManager::new(
277 "node1".to_string(),
278 vec!["node2".to_string(), "node3".to_string()],
279 );
280 let (state, term) = consensus.get_state();
281 assert_eq!(state, ConsensusState::Follower);
282 assert_eq!(term, 0);
283 assert!(consensus.start_election().is_ok());
284 let (state, term) = consensus.get_state();
285 assert_eq!(state, ConsensusState::Candidate);
286 assert_eq!(term, 1);
287 let vote_request = VoteRequest {
288 term: 2,
289 candidate_id: "node2".to_string(),
290 last_log_index: 0,
291 last_log_term: 0,
292 };
293 let response = consensus.handle_vote_request(vote_request);
294 assert!(response.vote_granted);
295 assert_eq!(response.term, 2);
296 }
297 #[test]
298 fn test_consensus_log_entry() {
299 let mut consensus = ConsensusManager::new("node1".to_string(), vec!["node2".to_string()]);
300 let entry = LogEntry {
301 term: 1,
302 index: 0,
303 command: "test_command".to_string(),
304 data: vec![1, 2, 3],
305 };
306 assert!(consensus.append_entry(entry.clone()).is_err());
307 consensus.state = ConsensusState::Leader;
308 assert!(consensus.append_entry(entry).is_ok());
309 assert_eq!(consensus.log.len(), 1);
310 }
311 #[test]
312 fn test_data_partitioner() {
313 let mut partitioner = DataPartitioner::new(PartitioningStrategy::Hash, 4, 2);
314 let partition1 = partitioner.get_partition("key1");
315 let partition2 = partitioner.get_partition("key2");
316 assert!(partition1 < 4);
317 assert!(partition2 < 4);
318 assert_eq!(partition1, partitioner.get_partition("key1"));
319 partitioner.assign_partition(0, "node1".to_string());
320 partitioner.assign_partition(1, "node2".to_string());
321 let nodes = partitioner.get_partition_nodes(0);
322 assert!(nodes.contains(&"node1".to_string()));
323 }
324 #[test]
325 fn test_data_partitioner_rebalancing() {
326 let mut partitioner = DataPartitioner::new(PartitioningStrategy::Hash, 4, 1);
327 let nodes = vec!["node1".to_string(), "node2".to_string()];
328 let result = partitioner.rebalance_partitions(&nodes);
329 assert_eq!(result.assignments_changed, 4);
330 assert_eq!(result.partitions_moved.len(), 0);
331 }
332 #[test]
333 fn test_partitioning_strategies() {
334 let hash_partitioner = DataPartitioner::new(PartitioningStrategy::Hash, 4, 1);
335 let range_partitioner = DataPartitioner::new(PartitioningStrategy::Range, 4, 1);
336 let random_partitioner = DataPartitioner::new(PartitioningStrategy::Random, 4, 1);
337 let key = "test_key";
338 let hash_partition = hash_partitioner.get_partition(key);
339 let range_partition = range_partitioner.get_partition(key);
340 let random_partition = random_partitioner.get_partition(key);
341 assert!(hash_partition < 4);
342 assert!(range_partition < 4);
343 assert!(random_partition < 4);
344 }
345 #[test]
346 fn test_advanced_job_scheduler() {
347 let mut scheduler = AdvancedJobScheduler::new();
348 let mut nodes = HashMap::new();
349 let node1 = create_test_node("node1");
350 let node2 = create_test_node("node2");
351 nodes.insert("node1".to_string(), node1);
352 nodes.insert("node2".to_string(), node2);
353 let jobs = vec![create_test_job("job1"), create_test_job("job2")];
354 let decisions = scheduler.gang_schedule(&jobs, &nodes).unwrap();
355 assert_eq!(decisions.len(), jobs.len());
356 for decision in &decisions {
357 assert!(nodes.contains_key(&decision.node_id));
358 assert!(decision.resource_allocation.cpu_cores > 0);
359 }
360 }
361 #[test]
362 fn test_backfill_scheduling() {
363 let mut scheduler = AdvancedJobScheduler::new();
364 let mut nodes = HashMap::new();
365 let node1 = create_test_node("node1");
366 nodes.insert("node1".to_string(), node1);
367 let waiting_jobs = vec![create_test_job("waiting_job")];
368 let decisions = scheduler.backfill_schedule(&waiting_jobs, &nodes).unwrap();
369 assert_eq!(decisions.len(), 1);
370 assert_eq!(decisions[0].job_id, "waiting_job");
371 }
372 #[test]
373 fn test_resource_reservation() {
374 let mut scheduler = AdvancedJobScheduler::new();
375 let reservation = ResourceReservation {
376 id: "reservation1".to_string(),
377 node_id: "node1".to_string(),
378 start_time: Instant::now(),
379 duration: Duration::from_secs(3600),
380 resources: ResourceAllocation {
381 cpu_cores: 4,
382 memory_gb: 8,
383 gpu_count: 1,
384 storage_gb: 100,
385 network_bandwidth: 1000,
386 },
387 };
388 assert!(scheduler.reserve_resources(reservation).is_ok());
389 assert_eq!(scheduler.resource_reservations.len(), 1);
390 }
391 #[test]
392 fn test_checkpoint_manager() {
393 let mut checkpoint_mgr = CheckpointManager::new(Duration::from_secs(300));
394 let job_state = JobState {
395 progress: 0.5,
396 intermediate_results: HashMap::new(),
397 runtime_state: vec![1, 2, 3, 4],
398 };
399 let checkpoint_id = checkpoint_mgr
400 .create_checkpoint("job1", job_state.clone())
401 .unwrap();
402 assert!(!checkpoint_id.is_empty());
403 let restored_state = checkpoint_mgr.restore_checkpoint(&checkpoint_id).unwrap();
404 assert_eq!(restored_state.progress, 0.5);
405 assert_eq!(restored_state.runtime_state, vec![1, 2, 3, 4]);
406 let stats = checkpoint_mgr.get_checkpoint_stats();
407 assert_eq!(stats.total_checkpoints, 1);
408 assert!(stats.total_size_bytes > 0);
409 }
410 #[test]
411 fn test_checkpoint_cleanup() {
412 let mut checkpoint_mgr = CheckpointManager::new(Duration::from_secs(300));
413 let job_state = JobState {
414 progress: 1.0,
415 intermediate_results: HashMap::new(),
416 runtime_state: vec![],
417 };
418 checkpoint_mgr
419 .create_checkpoint("job1", job_state.clone())
420 .unwrap();
421 checkpoint_mgr.create_checkpoint("job2", job_state).unwrap();
422 assert_eq!(checkpoint_mgr.checkpoint_storage.len(), 2);
423 checkpoint_mgr.cleanup_old_checkpoints(Duration::from_secs(0));
424 assert_eq!(checkpoint_mgr.checkpoint_storage.len(), 0);
425 }
426 #[test]
427 fn test_message_type_conversion() {
428 assert_eq!(format!("{}", MessageType::JobSubmission), "job_submission");
429 assert_eq!(format!("{}", MessageType::JobResult), "job_result");
430 assert_eq!(format!("{}", MessageType::Heartbeat), "heartbeat");
431 assert_eq!(
432 format!("{}", MessageType::ResourceUpdate),
433 "resource_update"
434 );
435 assert_eq!(
436 format!("{}", MessageType::ConsensusRequest),
437 "consensus_request"
438 );
439 assert_eq!(format!("{}", MessageType::DataPartition), "data_partition");
440 assert_eq!(
441 format!("{}", MessageType::Custom("test".to_string())),
442 "test"
443 );
444 }
445 #[test]
446 fn test_consensus_states() {
447 let follower = ConsensusState::Follower;
448 let candidate = ConsensusState::Candidate;
449 let leader = ConsensusState::Leader;
450 assert_eq!(follower, ConsensusState::Follower);
451 assert_eq!(candidate, ConsensusState::Candidate);
452 assert_eq!(leader, ConsensusState::Leader);
453 }
454 #[test]
455 fn test_scheduling_policies() {
456 let policies = [
457 SchedulingPolicy::FIFO,
458 SchedulingPolicy::ShortestJobFirst,
459 SchedulingPolicy::GangScheduling,
460 SchedulingPolicy::Backfill,
461 SchedulingPolicy::PriorityBased,
462 ];
463 assert_eq!(policies.len(), 5);
464 }
465 #[test]
466 fn test_message_priorities() {
467 let low = MessagePriority::Low;
468 let normal = MessagePriority::Normal;
469 let high = MessagePriority::High;
470 let critical = MessagePriority::Critical;
471 match low {
472 MessagePriority::Low => {}
473 _ => panic!(),
474 }
475 match normal {
476 MessagePriority::Normal => {}
477 _ => panic!(),
478 }
479 match high {
480 MessagePriority::High => {}
481 _ => panic!(),
482 }
483 match critical {
484 MessagePriority::Critical => {}
485 _ => panic!(),
486 }
487 }
488 #[test]
489 fn test_resource_allocation_calculations() {
490 let allocation = ResourceAllocation {
491 cpu_cores: 8,
492 memory_gb: 16,
493 gpu_count: 2,
494 storage_gb: 500,
495 network_bandwidth: 1000,
496 };
497 assert_eq!(allocation.cpu_cores, 8);
498 assert_eq!(allocation.memory_gb, 16);
499 assert_eq!(allocation.gpu_count, 2);
500 assert_eq!(allocation.storage_gb, 500);
501 assert_eq!(allocation.network_bandwidth, 1000);
502 }
503 #[test]
504 fn test_job_state_serialization() {
505 let mut intermediate_results = HashMap::new();
506 intermediate_results.insert("result1".to_string(), vec![1, 2, 3]);
507 intermediate_results.insert("result2".to_string(), vec![4, 5, 6]);
508 let job_state = JobState {
509 progress: 0.75,
510 intermediate_results,
511 runtime_state: vec![7, 8, 9],
512 };
513 assert_eq!(job_state.progress, 0.75);
514 assert_eq!(job_state.intermediate_results.len(), 2);
515 assert_eq!(job_state.runtime_state, vec![7, 8, 9]);
516 assert!(job_state.intermediate_results.contains_key("result1"));
517 assert!(job_state.intermediate_results.contains_key("result2"));
518 }
519}