1use scirs2_core::ndarray::{Array1, Array2, Axis};
7use scirs2_core::numeric::Float;
8use std::collections::HashMap;
9use std::fmt::Debug;
10use std::time::{Duration, Instant};
11
12use crate::error::{Result, TimeSeriesError};
13use statrs::statistics::Statistics;
14
15#[derive(Debug, Clone)]
17pub struct ClusterConfig {
18 pub nodes: Vec<String>,
20 pub max_concurrent_tasks: usize,
22 pub task_timeout: Duration,
24 pub chunk_size: usize,
26 pub load_balancing: LoadBalancingStrategy,
28 pub fault_tolerance: FaultToleranceConfig,
30}
31
32impl Default for ClusterConfig {
33 fn default() -> Self {
34 Self {
35 nodes: vec!["localhost:8080".to_string()],
36 max_concurrent_tasks: 4,
37 task_timeout: Duration::from_secs(30),
38 chunk_size: 10000,
39 load_balancing: LoadBalancingStrategy::RoundRobin,
40 fault_tolerance: FaultToleranceConfig::default(),
41 }
42 }
43}
44
45#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47pub enum LoadBalancingStrategy {
48 RoundRobin,
50 LoadBased,
52 Random,
54 Weighted,
56}
57
58#[derive(Debug, Clone)]
60pub struct FaultToleranceConfig {
61 pub max_retries: usize,
63 pub retry_delay: Duration,
65 pub enable_replication: bool,
67 pub replication_factor: usize,
69 pub failure_detection_timeout: Duration,
71}
72
73impl Default for FaultToleranceConfig {
74 fn default() -> Self {
75 Self {
76 max_retries: 3,
77 retry_delay: Duration::from_millis(500),
78 enable_replication: false,
79 replication_factor: 2,
80 failure_detection_timeout: Duration::from_secs(10),
81 }
82 }
83}
84
85#[derive(Debug, Clone)]
87pub struct DistributedTask<F: Float> {
88 pub id: String,
90 pub task_type: TaskType,
92 pub input_data: Array1<F>,
94 pub parameters: HashMap<String, f64>,
96 pub priority: TaskPriority,
98 pub dependencies: Vec<String>,
100}
101
102#[derive(Debug, Clone, PartialEq, Eq)]
104pub enum TaskType {
105 Forecasting,
107 Decomposition,
109 FeatureExtraction,
111 AnomalyDetection,
113 CrossCorrelation,
115 FourierTransform,
117 WaveletTransform,
119 Custom(String),
121}
122
123#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
125pub enum TaskPriority {
126 Low = 1,
128 Normal = 2,
130 High = 3,
132 Critical = 4,
134}
135
136#[derive(Debug, Clone)]
138pub struct TaskResult<F: Float> {
139 pub taskid: String,
141 pub status: TaskStatus,
143 pub data: Option<Array1<F>>,
145 pub metrics: TaskMetrics,
147 pub error: Option<String>,
149}
150
151#[derive(Debug, Clone, Copy, PartialEq, Eq)]
153pub enum TaskStatus {
154 Pending,
156 Running,
158 Completed,
160 Failed,
162 Cancelled,
164 Timeout,
166}
167
168#[derive(Debug, Clone)]
170pub struct TaskMetrics {
171 pub execution_time: Duration,
173 pub executed_on: String,
175 pub memory_usage: usize,
177 pub cpu_utilization: f64,
179 pub network_time: Duration,
181}
182
183impl Default for TaskMetrics {
184 fn default() -> Self {
185 Self {
186 execution_time: Duration::ZERO,
187 executed_on: String::new(),
188 memory_usage: 0,
189 cpu_utilization: 0.0,
190 network_time: Duration::ZERO,
191 }
192 }
193}
194
195#[derive(Debug, Clone)]
197pub struct NodeInfo {
198 pub address: String,
200 pub status: NodeStatus,
202 pub cpu_cores: usize,
204 pub total_memory: usize,
206 pub available_memory: usize,
208 pub current_load: f64,
210 pub running_tasks: usize,
212 pub capabilities: Vec<String>,
214 pub last_heartbeat: Instant,
216}
217
218#[derive(Debug, Clone, Copy, PartialEq, Eq)]
220pub enum NodeStatus {
221 Available,
223 Busy,
225 Offline,
227 Maintenance,
229 Failed,
231}
232
233pub struct DistributedProcessor<
235 F: Float
236 + Debug
237 + Clone
238 + scirs2_core::numeric::FromPrimitive
239 + scirs2_core::numeric::Zero
240 + scirs2_core::ndarray::ScalarOperand,
241> {
242 config: ClusterConfig,
244 nodes: HashMap<String, NodeInfo>,
246 task_queue: Vec<DistributedTask<F>>,
248 running_tasks: HashMap<String, DistributedTask<F>>,
250 completed_tasks: HashMap<String, TaskResult<F>>,
252 load_balancer_state: LoadBalancerState,
254}
255
256#[derive(Debug, Default)]
258struct LoadBalancerState {
259 round_robin_counter: usize,
261 #[allow(dead_code)]
263 node_weights: HashMap<String, f64>,
264 #[allow(dead_code)]
266 load_history: HashMap<String, Vec<f64>>,
267}
268
269impl<
270 F: Float
271 + Debug
272 + Clone
273 + scirs2_core::numeric::FromPrimitive
274 + scirs2_core::numeric::Zero
275 + scirs2_core::ndarray::ScalarOperand,
276 > DistributedProcessor<F>
277{
278 pub fn new(config: ClusterConfig) -> Self {
280 let mut nodes = HashMap::new();
281
282 for address in &config.nodes {
284 nodes.insert(
285 address.clone(),
286 NodeInfo {
287 address: address.clone(),
288 status: NodeStatus::Available,
289 cpu_cores: 4, total_memory: 8 * 1024 * 1024 * 1024, available_memory: 6 * 1024 * 1024 * 1024, current_load: 0.0,
293 running_tasks: 0,
294 capabilities: vec!["time_series".to_string(), "forecasting".to_string()],
295 last_heartbeat: Instant::now(),
296 },
297 );
298 }
299
300 Self {
301 config,
302 nodes,
303 task_queue: Vec::new(),
304 running_tasks: HashMap::new(),
305 completed_tasks: HashMap::new(),
306 load_balancer_state: LoadBalancerState::default(),
307 }
308 }
309
310 pub fn submit_task(&mut self, task: DistributedTask<F>) -> Result<()> {
312 for dep_id in &task.dependencies {
314 if !self.completed_tasks.contains_key(dep_id)
315 && !self.running_tasks.contains_key(dep_id)
316 {
317 return Err(TimeSeriesError::InvalidInput(format!(
318 "Dependency task {dep_id} not found"
319 )));
320 }
321 }
322
323 let insert_pos = self
325 .task_queue
326 .binary_search_by(|t| t.priority.cmp(&task.priority).reverse())
327 .unwrap_or_else(|pos| pos);
328
329 self.task_queue.insert(insert_pos, task);
330 Ok(())
331 }
332
333 pub fn distributed_forecast(
335 &mut self,
336 data: &Array1<F>,
337 horizon: usize,
338 method: &str,
339 ) -> Result<Array1<F>> {
340 let chunk_size = self
342 .config
343 .chunk_size
344 .min(data.len() / self.config.nodes.len().max(1));
345 let chunks: Vec<Array1<F>> = data
346 .axis_chunks_iter(Axis(0), chunk_size)
347 .map(|chunk| chunk.to_owned())
348 .collect();
349
350 let mut tasks = Vec::new();
352 for (i, chunk) in chunks.iter().enumerate() {
353 let mut params = HashMap::new();
354 params.insert("horizon".to_string(), horizon as f64);
355 params.insert("chunk_index".to_string(), i as f64);
356
357 let task = DistributedTask {
358 id: format!("forecast_chunk_{i}"),
359 task_type: TaskType::Forecasting,
360 input_data: chunk.clone(),
361 parameters: params,
362 priority: TaskPriority::Normal,
363 dependencies: vec![],
364 };
365
366 tasks.push(task);
367 }
368
369 for task in tasks {
371 self.submit_task(task)?;
372 }
373
374 self.process_pending_tasks()?;
376
377 self.aggregate_forecast_results(horizon)
379 }
380
381 pub fn distributed_feature_extraction(
383 &mut self,
384 data: &Array1<F>,
385 features: &[String],
386 ) -> Result<Array2<F>> {
387 let window_size = 1000.min(data.len() / 2);
389 let overlap = window_size / 4;
390 let step = window_size - overlap;
391
392 let mut tasks = Vec::new();
393 let mut i = 0;
394 let mut start = 0;
395
396 while start + window_size <= data.len() {
397 let end = (start + window_size).min(data.len());
398 let window = data.slice(scirs2_core::ndarray::s![start..end]).to_owned();
399
400 let mut params = HashMap::new();
401 params.insert("window_index".to_string(), i as f64);
402 params.insert("window_size".to_string(), window_size as f64);
403
404 let task = DistributedTask {
405 id: format!("features_window_{i}"),
406 task_type: TaskType::FeatureExtraction,
407 input_data: window,
408 parameters: params,
409 priority: TaskPriority::Normal,
410 dependencies: vec![],
411 };
412
413 tasks.push(task);
414 start += step;
415 i += 1;
416 }
417
418 for task in tasks {
420 self.submit_task(task)?;
421 }
422
423 self.process_pending_tasks()?;
425
426 self.aggregate_feature_results(features.len())
428 }
429
430 fn select_node_for_task(&mut self, task: &DistributedTask<F>) -> Result<String> {
432 let available_nodes: Vec<&String> = self
433 .nodes
434 .iter()
435 .filter(|(_, info)| {
436 info.status == NodeStatus::Available
437 && info.running_tasks < self.config.max_concurrent_tasks
438 })
439 .map(|(address, _)| address)
440 .collect();
441
442 if available_nodes.is_empty() {
443 return Err(TimeSeriesError::ComputationError(
444 "No available nodes for task execution".to_string(),
445 ));
446 }
447
448 let selected_node = match self.config.load_balancing {
449 LoadBalancingStrategy::RoundRobin => {
450 let index = self.load_balancer_state.round_robin_counter % available_nodes.len();
451 self.load_balancer_state.round_robin_counter += 1;
452 available_nodes[index].clone()
453 }
454 LoadBalancingStrategy::LoadBased => {
455 available_nodes
457 .iter()
458 .min_by(|a, b| {
459 let load_a = self.nodes.get(*a as &str).unwrap().current_load;
460 let load_b = self.nodes.get(*b as &str).unwrap().current_load;
461 load_a
462 .partial_cmp(&load_b)
463 .unwrap_or(std::cmp::Ordering::Equal)
464 })
465 .unwrap()
466 .to_string()
467 }
468 LoadBalancingStrategy::Random => {
469 let hash = task.id.len() % available_nodes.len();
471 available_nodes[hash].clone()
472 }
473 LoadBalancingStrategy::Weighted => {
474 available_nodes[0].clone() }
477 };
478
479 Ok(selected_node)
480 }
481
482 fn process_pending_tasks(&mut self) -> Result<()> {
484 while let Some(task) = self.task_queue.pop() {
485 let dependencies_satisfied = task.dependencies.iter().all(|dep_id| {
487 self.completed_tasks
488 .get(dep_id)
489 .map(|result| result.status == TaskStatus::Completed)
490 .unwrap_or(false)
491 });
492
493 if !dependencies_satisfied {
494 self.task_queue.push(task);
496 continue;
497 }
498
499 let node_address = self.select_node_for_task(&task)?;
501
502 let result = self.execute_task_on_node(&task, &node_address)?;
504
505 self.completed_tasks.insert(task.id.clone(), result);
507 self.running_tasks.remove(&task.id);
508 }
509
510 Ok(())
511 }
512
513 fn execute_task_on_node(
515 &mut self,
516 task: &DistributedTask<F>,
517 node_address: &str,
518 ) -> Result<TaskResult<F>> {
519 let start_time = Instant::now();
520
521 self.running_tasks.insert(task.id.clone(), task.clone());
523
524 if let Some(node) = self.nodes.get_mut(node_address) {
526 node.running_tasks += 1;
527 node.current_load = node.running_tasks as f64 / self.config.max_concurrent_tasks as f64;
528 }
529
530 let result_data = match task.task_type {
532 TaskType::Forecasting => self.simulate_forecasting_task(task)?,
533 TaskType::FeatureExtraction => self.simulate_feature_extraction_task(task)?,
534 TaskType::AnomalyDetection => self.simulate_anomaly_detection_task(task)?,
535 TaskType::Decomposition => self.simulate_decomposition_task(task)?,
536 _ => {
537 task.input_data.clone()
539 }
540 };
541
542 let execution_time = start_time.elapsed();
543
544 if let Some(node) = self.nodes.get_mut(node_address) {
546 node.running_tasks = node.running_tasks.saturating_sub(1);
547 node.current_load = node.running_tasks as f64 / self.config.max_concurrent_tasks as f64;
548 }
549
550 Ok(TaskResult {
551 taskid: task.id.clone(),
552 status: TaskStatus::Completed,
553 data: Some(result_data),
554 metrics: TaskMetrics {
555 execution_time,
556 executed_on: node_address.to_string(),
557 memory_usage: task.input_data.len() * std::mem::size_of::<F>(),
558 cpu_utilization: 0.8, network_time: Duration::from_millis(10), },
561 error: None,
562 })
563 }
564
565 fn simulate_forecasting_task(&self, task: &DistributedTask<F>) -> Result<Array1<F>> {
567 let horizon = task
568 .parameters
569 .get("horizon")
570 .map(|&h| h as usize)
571 .unwrap_or(10);
572
573 let data = &task.input_data;
575 if data.len() < 2 {
576 return Ok(Array1::zeros(horizon));
577 }
578
579 let slope = (data[data.len() - 1] - data[data.len() - 2]) / F::one();
580 let mut forecast = Array1::zeros(horizon);
581
582 for i in 0..horizon {
583 forecast[i] = data[data.len() - 1] + slope * F::from(i + 1).unwrap();
584 }
585
586 Ok(forecast)
587 }
588
589 fn simulate_feature_extraction_task(&self, task: &DistributedTask<F>) -> Result<Array1<F>> {
591 let data = &task.input_data;
592
593 let mean = data.mean().unwrap_or(F::zero());
595 let variance = data.var(F::zero());
596 let min = data.iter().fold(F::infinity(), |acc, &x| acc.min(x));
597 let max = data.iter().fold(F::neg_infinity(), |acc, &x| acc.max(x));
598
599 let features = vec![
601 mean,
602 variance.sqrt(), min,
604 max,
605 max - min, ];
607
608 Ok(Array1::from_vec(features))
609 }
610
611 fn simulate_anomaly_detection_task(&self, task: &DistributedTask<F>) -> Result<Array1<F>> {
613 let data = &task.input_data;
614 let mean = data.mean().unwrap_or(F::zero());
615 let std_dev = data.var(F::zero()).sqrt();
616
617 let threshold = F::from(3.0).unwrap();
619 let mut anomaly_scores = Array1::zeros(data.len());
620
621 for (i, &value) in data.iter().enumerate() {
622 let z_score = (value - mean) / std_dev;
623 anomaly_scores[i] = if z_score.abs() > threshold {
624 F::one()
625 } else {
626 F::zero()
627 };
628 }
629
630 Ok(anomaly_scores)
631 }
632
633 fn simulate_decomposition_task(&self, task: &DistributedTask<F>) -> Result<Array1<F>> {
635 let data = &task.input_data;
637 let window_size = 10.min(data.len() / 2);
638 let mut trend = Array1::zeros(data.len());
639
640 for i in 0..data.len() {
641 let start = i.saturating_sub(window_size / 2);
642 let end = (i + window_size / 2 + 1).min(data.len());
643
644 let window_sum = data.slice(scirs2_core::ndarray::s![start..end]).sum();
645 let window_len = F::from(end - start).unwrap();
646 trend[i] = window_sum / window_len;
647 }
648
649 Ok(trend)
650 }
651
652 fn aggregate_forecast_results(&self, horizon: usize) -> Result<Array1<F>> {
654 let mut all_forecasts = Vec::new();
655 let mut chunk_indices = Vec::new();
656
657 for (taskid, result) in &self.completed_tasks {
659 if taskid.starts_with("forecast_chunk_") && result.status == TaskStatus::Completed {
660 if let Some(data) = &result.data {
661 all_forecasts.push(data.clone());
662
663 if let Some(chunk_str) = taskid.strip_prefix("forecast_chunk_") {
665 if let Ok(index) = chunk_str.parse::<usize>() {
666 chunk_indices.push(index);
667 }
668 }
669 }
670 }
671 }
672
673 if all_forecasts.is_empty() {
674 return Ok(Array1::zeros(horizon));
675 }
676
677 let mut indexed_forecasts: Vec<(usize, Array1<F>)> =
679 chunk_indices.into_iter().zip(all_forecasts).collect();
680 indexed_forecasts.sort_by_key(|(index_, _)| *index_);
681
682 let mut final_forecast = Array1::zeros(horizon);
684 let mut count = 0;
685
686 for (_, forecast) in indexed_forecasts {
687 let actual_horizon = forecast.len().min(horizon);
688 for i in 0..actual_horizon {
689 final_forecast[i] = final_forecast[i] + forecast[i];
690 }
691 count += 1;
692 }
693
694 if count > 0 {
695 final_forecast = final_forecast / F::from(count).unwrap();
696 }
697
698 Ok(final_forecast)
699 }
700
701 fn aggregate_feature_results(&self, numfeatures: usize) -> Result<Array2<F>> {
703 let mut all_features = Vec::new();
704 let mut window_indices = Vec::new();
705
706 for (taskid, result) in &self.completed_tasks {
708 if taskid.starts_with("features_window_") && result.status == TaskStatus::Completed {
709 if let Some(data) = &result.data {
710 all_features.push(data.clone());
711
712 if let Some(window_str) = taskid.strip_prefix("features_window_") {
714 if let Ok(index) = window_str.parse::<usize>() {
715 window_indices.push(index);
716 }
717 }
718 }
719 }
720 }
721
722 if all_features.is_empty() {
723 return Ok(Array2::zeros((0, numfeatures)));
724 }
725
726 let mut indexed_features: Vec<(usize, Array1<F>)> =
728 window_indices.into_iter().zip(all_features).collect();
729 indexed_features.sort_by_key(|(index_, _)| *index_);
730
731 let num_windows = indexed_features.len();
733 let feature_size = indexed_features[0].1.len().min(numfeatures);
734 let mut result = Array2::zeros((num_windows, feature_size));
735
736 for (row, (_, features)) in indexed_features.iter().enumerate() {
737 for col in 0..feature_size {
738 if col < features.len() {
739 result[[row, col]] = features[col];
740 }
741 }
742 }
743
744 Ok(result)
745 }
746
747 pub fn get_cluster_status(&self) -> ClusterStatus {
749 let total_nodes = self.nodes.len();
750 let available_nodes = self
751 .nodes
752 .values()
753 .filter(|node| node.status == NodeStatus::Available)
754 .count();
755
756 let total_running_tasks = self.running_tasks.len();
757 let total_completed_tasks = self.completed_tasks.len();
758 let total_queued_tasks = self.task_queue.len();
759
760 let average_load = if total_nodes > 0 {
761 self.nodes
762 .values()
763 .map(|node| node.current_load)
764 .sum::<f64>()
765 / total_nodes as f64
766 } else {
767 0.0
768 };
769
770 ClusterStatus {
771 total_nodes,
772 available_nodes,
773 total_running_tasks,
774 total_completed_tasks,
775 total_queued_tasks,
776 average_load,
777 nodes: self.nodes.clone(),
778 }
779 }
780
781 pub fn clear_completed_tasks(&mut self) {
783 self.completed_tasks.clear();
784 }
785
786 pub fn cancel_task(&mut self, taskid: &str) -> Result<()> {
788 if let Some(_task) = self.running_tasks.remove(taskid) {
789 self.completed_tasks.insert(
791 taskid.to_string(),
792 TaskResult {
793 taskid: taskid.to_string(),
794 status: TaskStatus::Cancelled,
795 data: None,
796 metrics: TaskMetrics::default(),
797 error: Some("Task cancelled by user".to_string()),
798 },
799 );
800 Ok(())
801 } else {
802 Err(TimeSeriesError::InvalidInput(format!(
803 "Task {taskid} not found in running tasks"
804 )))
805 }
806 }
807}
808
809#[derive(Debug, Clone)]
811pub struct ClusterStatus {
812 pub total_nodes: usize,
814 pub available_nodes: usize,
816 pub total_running_tasks: usize,
818 pub total_completed_tasks: usize,
820 pub total_queued_tasks: usize,
822 pub average_load: f64,
824 pub nodes: HashMap<String, NodeInfo>,
826}
827
828#[allow(dead_code)]
830pub fn distributed_moving_average<
831 F: Float
832 + Debug
833 + Clone
834 + scirs2_core::numeric::FromPrimitive
835 + scirs2_core::numeric::Zero
836 + scirs2_core::ndarray::ScalarOperand,
837>(
838 processor: &mut DistributedProcessor<F>,
839 data: &Array1<F>,
840 window_size: usize,
841) -> Result<Array1<F>> {
842 let task = DistributedTask {
844 id: "moving_average".to_string(),
845 task_type: TaskType::Custom("moving_average".to_string()),
846 input_data: data.clone(),
847 parameters: {
848 let mut params = HashMap::new();
849 params.insert("window_size".to_string(), window_size as f64);
850 params
851 },
852 priority: TaskPriority::Normal,
853 dependencies: vec![],
854 };
855
856 processor.submit_task(task)?;
857 processor.process_pending_tasks()?;
858
859 if let Some(result) = processor.completed_tasks.get("moving_average") {
861 if let Some(data) = &result.data {
862 Ok(data.clone())
863 } else {
864 Err(TimeSeriesError::ComputationError(
865 "Moving average computation failed".to_string(),
866 ))
867 }
868 } else {
869 Err(TimeSeriesError::ComputationError(
870 "Moving average task not found".to_string(),
871 ))
872 }
873}
874
875#[cfg(test)]
876mod tests {
877 use super::*;
878
879 #[test]
880 fn test_cluster_config_default() {
881 let config = ClusterConfig::default();
882 assert_eq!(config.nodes.len(), 1);
883 assert_eq!(config.max_concurrent_tasks, 4);
884 assert_eq!(config.chunk_size, 10000);
885 }
886
887 #[test]
888 fn test_distributed_processor_creation() {
889 let config = ClusterConfig::default();
890 let processor: DistributedProcessor<f64> = DistributedProcessor::new(config);
891
892 assert_eq!(processor.nodes.len(), 1);
893 assert!(processor.task_queue.is_empty());
894 assert!(processor.running_tasks.is_empty());
895 assert!(processor.completed_tasks.is_empty());
896 }
897
898 #[test]
899 fn test_task_submission() {
900 let config = ClusterConfig::default();
901 let mut processor: DistributedProcessor<f64> = DistributedProcessor::new(config);
902
903 let task = DistributedTask {
904 id: "test_task".to_string(),
905 task_type: TaskType::Forecasting,
906 input_data: Array1::from_vec(vec![1.0, 2.0, 3.0, 4.0, 5.0]),
907 parameters: HashMap::new(),
908 priority: TaskPriority::Normal,
909 dependencies: vec![],
910 };
911
912 assert!(processor.submit_task(task).is_ok());
913 assert_eq!(processor.task_queue.len(), 1);
914 }
915
916 #[test]
917 fn test_task_priority_ordering() {
918 let config = ClusterConfig::default();
919 let mut processor: DistributedProcessor<f64> = DistributedProcessor::new(config);
920
921 let low_task = DistributedTask {
923 id: "low".to_string(),
924 task_type: TaskType::Forecasting,
925 input_data: Array1::zeros(10),
926 parameters: HashMap::new(),
927 priority: TaskPriority::Low,
928 dependencies: vec![],
929 };
930
931 let high_task = DistributedTask {
932 id: "high".to_string(),
933 task_type: TaskType::Forecasting,
934 input_data: Array1::zeros(10),
935 parameters: HashMap::new(),
936 priority: TaskPriority::High,
937 dependencies: vec![],
938 };
939
940 processor.submit_task(low_task).unwrap();
941 processor.submit_task(high_task).unwrap();
942
943 assert_eq!(processor.task_queue[0].priority, TaskPriority::High);
945 assert_eq!(processor.task_queue[1].priority, TaskPriority::Low);
946 }
947
948 #[test]
949 fn test_distributed_forecasting() {
950 let config = ClusterConfig::default();
951 let mut processor: DistributedProcessor<f64> = DistributedProcessor::new(config);
952
953 let data = Array1::from_vec((1..100).map(|x| x as f64).collect());
954 let horizon = 10;
955
956 let result = processor.distributed_forecast(&data, horizon, "linear");
957 assert!(result.is_ok());
958
959 let forecast = result.unwrap();
960 assert_eq!(forecast.len(), horizon);
961 }
962
963 #[test]
964 fn test_cluster_status() {
965 let config = ClusterConfig::default();
966 let processor: DistributedProcessor<f64> = DistributedProcessor::new(config);
967
968 let status = processor.get_cluster_status();
969 assert_eq!(status.total_nodes, 1);
970 assert_eq!(status.available_nodes, 1);
971 assert_eq!(status.total_running_tasks, 0);
972 assert_eq!(status.total_completed_tasks, 0);
973 assert_eq!(status.total_queued_tasks, 0);
974 }
975
976 #[test]
977 fn test_load_balancing_strategies() {
978 assert_ne!(
980 LoadBalancingStrategy::RoundRobin,
981 LoadBalancingStrategy::LoadBased
982 );
983 assert_ne!(
984 LoadBalancingStrategy::Random,
985 LoadBalancingStrategy::Weighted
986 );
987 }
988
989 #[test]
990 fn test_task_status_enum() {
991 assert_eq!(TaskStatus::Pending, TaskStatus::Pending);
992 assert_ne!(TaskStatus::Running, TaskStatus::Completed);
993 }
994
995 #[test]
996 fn test_fault_tolerance_config() {
997 let config = FaultToleranceConfig::default();
998 assert_eq!(config.max_retries, 3);
999 assert_eq!(config.replication_factor, 2);
1000 assert!(!config.enable_replication);
1001 }
1002}