1use scirs2_core::ndarray::ArrayStatCompat;
7use scirs2_core::ndarray::{Array1, Array2, Axis};
8use scirs2_core::numeric::Float;
9use std::collections::HashMap;
10use std::fmt::Debug;
11use std::time::{Duration, Instant};
12
13use crate::error::{Result, TimeSeriesError};
14use statrs::statistics::Statistics;
15
16#[derive(Debug, Clone)]
18pub struct ClusterConfig {
19 pub nodes: Vec<String>,
21 pub max_concurrent_tasks: usize,
23 pub task_timeout: Duration,
25 pub chunk_size: usize,
27 pub load_balancing: LoadBalancingStrategy,
29 pub fault_tolerance: FaultToleranceConfig,
31}
32
33impl Default for ClusterConfig {
34 fn default() -> Self {
35 Self {
36 nodes: vec!["localhost:8080".to_string()],
37 max_concurrent_tasks: 4,
38 task_timeout: Duration::from_secs(30),
39 chunk_size: 10000,
40 load_balancing: LoadBalancingStrategy::RoundRobin,
41 fault_tolerance: FaultToleranceConfig::default(),
42 }
43 }
44}
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
48pub enum LoadBalancingStrategy {
49 RoundRobin,
51 LoadBased,
53 Random,
55 Weighted,
57}
58
59#[derive(Debug, Clone)]
61pub struct FaultToleranceConfig {
62 pub max_retries: usize,
64 pub retry_delay: Duration,
66 pub enable_replication: bool,
68 pub replication_factor: usize,
70 pub failure_detection_timeout: Duration,
72}
73
74impl Default for FaultToleranceConfig {
75 fn default() -> Self {
76 Self {
77 max_retries: 3,
78 retry_delay: Duration::from_millis(500),
79 enable_replication: false,
80 replication_factor: 2,
81 failure_detection_timeout: Duration::from_secs(10),
82 }
83 }
84}
85
86#[derive(Debug, Clone)]
88pub struct DistributedTask<F: Float> {
89 pub id: String,
91 pub task_type: TaskType,
93 pub input_data: Array1<F>,
95 pub parameters: HashMap<String, f64>,
97 pub priority: TaskPriority,
99 pub dependencies: Vec<String>,
101}
102
103#[derive(Debug, Clone, PartialEq, Eq)]
105pub enum TaskType {
106 Forecasting,
108 Decomposition,
110 FeatureExtraction,
112 AnomalyDetection,
114 CrossCorrelation,
116 FourierTransform,
118 WaveletTransform,
120 Custom(String),
122}
123
124#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
126pub enum TaskPriority {
127 Low = 1,
129 Normal = 2,
131 High = 3,
133 Critical = 4,
135}
136
137#[derive(Debug, Clone)]
139pub struct TaskResult<F: Float> {
140 pub taskid: String,
142 pub status: TaskStatus,
144 pub data: Option<Array1<F>>,
146 pub metrics: TaskMetrics,
148 pub error: Option<String>,
150}
151
152#[derive(Debug, Clone, Copy, PartialEq, Eq)]
154pub enum TaskStatus {
155 Pending,
157 Running,
159 Completed,
161 Failed,
163 Cancelled,
165 Timeout,
167}
168
169#[derive(Debug, Clone)]
171pub struct TaskMetrics {
172 pub execution_time: Duration,
174 pub executed_on: String,
176 pub memory_usage: usize,
178 pub cpu_utilization: f64,
180 pub network_time: Duration,
182}
183
184impl Default for TaskMetrics {
185 fn default() -> Self {
186 Self {
187 execution_time: Duration::ZERO,
188 executed_on: String::new(),
189 memory_usage: 0,
190 cpu_utilization: 0.0,
191 network_time: Duration::ZERO,
192 }
193 }
194}
195
196#[derive(Debug, Clone)]
198pub struct NodeInfo {
199 pub address: String,
201 pub status: NodeStatus,
203 pub cpu_cores: usize,
205 pub total_memory: usize,
207 pub available_memory: usize,
209 pub current_load: f64,
211 pub running_tasks: usize,
213 pub capabilities: Vec<String>,
215 pub last_heartbeat: Instant,
217}
218
219#[derive(Debug, Clone, Copy, PartialEq, Eq)]
221pub enum NodeStatus {
222 Available,
224 Busy,
226 Offline,
228 Maintenance,
230 Failed,
232}
233
234pub struct DistributedProcessor<
236 F: Float
237 + Debug
238 + Clone
239 + scirs2_core::numeric::FromPrimitive
240 + scirs2_core::numeric::Zero
241 + scirs2_core::ndarray::ScalarOperand,
242> {
243 config: ClusterConfig,
245 nodes: HashMap<String, NodeInfo>,
247 task_queue: Vec<DistributedTask<F>>,
249 running_tasks: HashMap<String, DistributedTask<F>>,
251 completed_tasks: HashMap<String, TaskResult<F>>,
253 load_balancer_state: LoadBalancerState,
255}
256
257#[derive(Debug, Default)]
259struct LoadBalancerState {
260 round_robin_counter: usize,
262 #[allow(dead_code)]
264 node_weights: HashMap<String, f64>,
265 #[allow(dead_code)]
267 load_history: HashMap<String, Vec<f64>>,
268}
269
270impl<
271 F: Float
272 + Debug
273 + Clone
274 + scirs2_core::numeric::FromPrimitive
275 + scirs2_core::numeric::Zero
276 + scirs2_core::ndarray::ScalarOperand,
277 > DistributedProcessor<F>
278{
279 pub fn new(config: ClusterConfig) -> Self {
281 let mut nodes = HashMap::new();
282
283 for address in &config.nodes {
285 nodes.insert(
286 address.clone(),
287 NodeInfo {
288 address: address.clone(),
289 status: NodeStatus::Available,
290 cpu_cores: 4, total_memory: 8 * 1024 * 1024 * 1024, available_memory: 6 * 1024 * 1024 * 1024, current_load: 0.0,
294 running_tasks: 0,
295 capabilities: vec!["time_series".to_string(), "forecasting".to_string()],
296 last_heartbeat: Instant::now(),
297 },
298 );
299 }
300
301 Self {
302 config,
303 nodes,
304 task_queue: Vec::new(),
305 running_tasks: HashMap::new(),
306 completed_tasks: HashMap::new(),
307 load_balancer_state: LoadBalancerState::default(),
308 }
309 }
310
311 pub fn submit_task(&mut self, task: DistributedTask<F>) -> Result<()> {
313 for dep_id in &task.dependencies {
315 if !self.completed_tasks.contains_key(dep_id)
316 && !self.running_tasks.contains_key(dep_id)
317 {
318 return Err(TimeSeriesError::InvalidInput(format!(
319 "Dependency task {dep_id} not found"
320 )));
321 }
322 }
323
324 let insert_pos = self
326 .task_queue
327 .binary_search_by(|t| t.priority.cmp(&task.priority).reverse())
328 .unwrap_or_else(|pos| pos);
329
330 self.task_queue.insert(insert_pos, task);
331 Ok(())
332 }
333
334 pub fn distributed_forecast(
336 &mut self,
337 data: &Array1<F>,
338 horizon: usize,
339 method: &str,
340 ) -> Result<Array1<F>> {
341 let chunk_size = self
343 .config
344 .chunk_size
345 .min(data.len() / self.config.nodes.len().max(1));
346 let chunks: Vec<Array1<F>> = data
347 .axis_chunks_iter(Axis(0), chunk_size)
348 .map(|chunk| chunk.to_owned())
349 .collect();
350
351 let mut tasks = Vec::new();
353 for (i, chunk) in chunks.iter().enumerate() {
354 let mut params = HashMap::new();
355 params.insert("horizon".to_string(), horizon as f64);
356 params.insert("chunk_index".to_string(), i as f64);
357
358 let task = DistributedTask {
359 id: format!("forecast_chunk_{i}"),
360 task_type: TaskType::Forecasting,
361 input_data: chunk.clone(),
362 parameters: params,
363 priority: TaskPriority::Normal,
364 dependencies: vec![],
365 };
366
367 tasks.push(task);
368 }
369
370 for task in tasks {
372 self.submit_task(task)?;
373 }
374
375 self.process_pending_tasks()?;
377
378 self.aggregate_forecast_results(horizon)
380 }
381
382 pub fn distributed_feature_extraction(
384 &mut self,
385 data: &Array1<F>,
386 features: &[String],
387 ) -> Result<Array2<F>> {
388 let window_size = 1000.min(data.len() / 2);
390 let overlap = window_size / 4;
391 let step = window_size - overlap;
392
393 let mut tasks = Vec::new();
394 let mut i = 0;
395 let mut start = 0;
396
397 while start + window_size <= data.len() {
398 let end = (start + window_size).min(data.len());
399 let window = data.slice(scirs2_core::ndarray::s![start..end]).to_owned();
400
401 let mut params = HashMap::new();
402 params.insert("window_index".to_string(), i as f64);
403 params.insert("window_size".to_string(), window_size as f64);
404
405 let task = DistributedTask {
406 id: format!("features_window_{i}"),
407 task_type: TaskType::FeatureExtraction,
408 input_data: window,
409 parameters: params,
410 priority: TaskPriority::Normal,
411 dependencies: vec![],
412 };
413
414 tasks.push(task);
415 start += step;
416 i += 1;
417 }
418
419 for task in tasks {
421 self.submit_task(task)?;
422 }
423
424 self.process_pending_tasks()?;
426
427 self.aggregate_feature_results(features.len())
429 }
430
431 fn select_node_for_task(&mut self, task: &DistributedTask<F>) -> Result<String> {
433 let available_nodes: Vec<&String> = self
434 .nodes
435 .iter()
436 .filter(|(_, info)| {
437 info.status == NodeStatus::Available
438 && info.running_tasks < self.config.max_concurrent_tasks
439 })
440 .map(|(address, _)| address)
441 .collect();
442
443 if available_nodes.is_empty() {
444 return Err(TimeSeriesError::ComputationError(
445 "No available nodes for task execution".to_string(),
446 ));
447 }
448
449 let selected_node = match self.config.load_balancing {
450 LoadBalancingStrategy::RoundRobin => {
451 let index = self.load_balancer_state.round_robin_counter % available_nodes.len();
452 self.load_balancer_state.round_robin_counter += 1;
453 available_nodes[index].clone()
454 }
455 LoadBalancingStrategy::LoadBased => {
456 available_nodes
458 .iter()
459 .min_by(|a, b| {
460 let load_a = self
461 .nodes
462 .get(*a as &str)
463 .expect("Operation failed")
464 .current_load;
465 let load_b = self
466 .nodes
467 .get(*b as &str)
468 .expect("Operation failed")
469 .current_load;
470 load_a
471 .partial_cmp(&load_b)
472 .unwrap_or(std::cmp::Ordering::Equal)
473 })
474 .expect("Operation failed")
475 .to_string()
476 }
477 LoadBalancingStrategy::Random => {
478 let hash = task.id.len() % available_nodes.len();
480 available_nodes[hash].clone()
481 }
482 LoadBalancingStrategy::Weighted => {
483 available_nodes[0].clone() }
486 };
487
488 Ok(selected_node)
489 }
490
491 fn process_pending_tasks(&mut self) -> Result<()> {
493 while let Some(task) = self.task_queue.pop() {
494 let dependencies_satisfied = task.dependencies.iter().all(|dep_id| {
496 self.completed_tasks
497 .get(dep_id)
498 .map(|result| result.status == TaskStatus::Completed)
499 .unwrap_or(false)
500 });
501
502 if !dependencies_satisfied {
503 self.task_queue.push(task);
505 continue;
506 }
507
508 let node_address = self.select_node_for_task(&task)?;
510
511 let result = self.execute_task_on_node(&task, &node_address)?;
513
514 self.completed_tasks.insert(task.id.clone(), result);
516 self.running_tasks.remove(&task.id);
517 }
518
519 Ok(())
520 }
521
522 fn execute_task_on_node(
524 &mut self,
525 task: &DistributedTask<F>,
526 node_address: &str,
527 ) -> Result<TaskResult<F>> {
528 let start_time = Instant::now();
529
530 self.running_tasks.insert(task.id.clone(), task.clone());
532
533 if let Some(node) = self.nodes.get_mut(node_address) {
535 node.running_tasks += 1;
536 node.current_load = node.running_tasks as f64 / self.config.max_concurrent_tasks as f64;
537 }
538
539 let result_data = match task.task_type {
541 TaskType::Forecasting => self.simulate_forecasting_task(task)?,
542 TaskType::FeatureExtraction => self.simulate_feature_extraction_task(task)?,
543 TaskType::AnomalyDetection => self.simulate_anomaly_detection_task(task)?,
544 TaskType::Decomposition => self.simulate_decomposition_task(task)?,
545 _ => {
546 task.input_data.clone()
548 }
549 };
550
551 let execution_time = start_time.elapsed();
552
553 if let Some(node) = self.nodes.get_mut(node_address) {
555 node.running_tasks = node.running_tasks.saturating_sub(1);
556 node.current_load = node.running_tasks as f64 / self.config.max_concurrent_tasks as f64;
557 }
558
559 Ok(TaskResult {
560 taskid: task.id.clone(),
561 status: TaskStatus::Completed,
562 data: Some(result_data),
563 metrics: TaskMetrics {
564 execution_time,
565 executed_on: node_address.to_string(),
566 memory_usage: task.input_data.len() * std::mem::size_of::<F>(),
567 cpu_utilization: 0.8, network_time: Duration::from_millis(10), },
570 error: None,
571 })
572 }
573
574 fn simulate_forecasting_task(&self, task: &DistributedTask<F>) -> Result<Array1<F>> {
576 let horizon = task
577 .parameters
578 .get("horizon")
579 .map(|&h| h as usize)
580 .unwrap_or(10);
581
582 let data = &task.input_data;
584 if data.len() < 2 {
585 return Ok(Array1::zeros(horizon));
586 }
587
588 let slope = (data[data.len() - 1] - data[data.len() - 2]) / F::one();
589 let mut forecast = Array1::zeros(horizon);
590
591 for i in 0..horizon {
592 forecast[i] =
593 data[data.len() - 1] + slope * F::from(i + 1).expect("Failed to convert to float");
594 }
595
596 Ok(forecast)
597 }
598
599 fn simulate_feature_extraction_task(&self, task: &DistributedTask<F>) -> Result<Array1<F>> {
601 let data = &task.input_data;
602
603 let mean = data.mean_or(F::zero());
605 let variance = data.var(F::zero());
606 let min = data.iter().fold(F::infinity(), |acc, &x| acc.min(x));
607 let max = data.iter().fold(F::neg_infinity(), |acc, &x| acc.max(x));
608
609 let features = vec![
611 mean,
612 variance.sqrt(), min,
614 max,
615 max - min, ];
617
618 Ok(Array1::from_vec(features))
619 }
620
621 fn simulate_anomaly_detection_task(&self, task: &DistributedTask<F>) -> Result<Array1<F>> {
623 let data = &task.input_data;
624 let mean = data.mean_or(F::zero());
625 let std_dev = data.var(F::zero()).sqrt();
626
627 let threshold = F::from(3.0).expect("Failed to convert constant to float");
629 let mut anomaly_scores = Array1::zeros(data.len());
630
631 for (i, &value) in data.iter().enumerate() {
632 let z_score = (value - mean) / std_dev;
633 anomaly_scores[i] = if z_score.abs() > threshold {
634 F::one()
635 } else {
636 F::zero()
637 };
638 }
639
640 Ok(anomaly_scores)
641 }
642
643 fn simulate_decomposition_task(&self, task: &DistributedTask<F>) -> Result<Array1<F>> {
645 let data = &task.input_data;
647 let window_size = 10.min(data.len() / 2);
648 let mut trend = Array1::zeros(data.len());
649
650 for i in 0..data.len() {
651 let start = i.saturating_sub(window_size / 2);
652 let end = (i + window_size / 2 + 1).min(data.len());
653
654 let window_sum = data.slice(scirs2_core::ndarray::s![start..end]).sum();
655 let window_len = F::from(end - start).expect("Failed to convert to float");
656 trend[i] = window_sum / window_len;
657 }
658
659 Ok(trend)
660 }
661
662 fn aggregate_forecast_results(&self, horizon: usize) -> Result<Array1<F>> {
664 let mut all_forecasts = Vec::new();
665 let mut chunk_indices = Vec::new();
666
667 for (taskid, result) in &self.completed_tasks {
669 if taskid.starts_with("forecast_chunk_") && result.status == TaskStatus::Completed {
670 if let Some(data) = &result.data {
671 all_forecasts.push(data.clone());
672
673 if let Some(chunk_str) = taskid.strip_prefix("forecast_chunk_") {
675 if let Ok(index) = chunk_str.parse::<usize>() {
676 chunk_indices.push(index);
677 }
678 }
679 }
680 }
681 }
682
683 if all_forecasts.is_empty() {
684 return Ok(Array1::zeros(horizon));
685 }
686
687 let mut indexed_forecasts: Vec<(usize, Array1<F>)> =
689 chunk_indices.into_iter().zip(all_forecasts).collect();
690 indexed_forecasts.sort_by_key(|(index_, _)| *index_);
691
692 let mut final_forecast = Array1::zeros(horizon);
694 let mut count = 0;
695
696 for (_, forecast) in indexed_forecasts {
697 let actual_horizon = forecast.len().min(horizon);
698 for i in 0..actual_horizon {
699 final_forecast[i] = final_forecast[i] + forecast[i];
700 }
701 count += 1;
702 }
703
704 if count > 0 {
705 final_forecast = final_forecast / F::from(count).expect("Failed to convert to float");
706 }
707
708 Ok(final_forecast)
709 }
710
711 fn aggregate_feature_results(&self, numfeatures: usize) -> Result<Array2<F>> {
713 let mut all_features = Vec::new();
714 let mut window_indices = Vec::new();
715
716 for (taskid, result) in &self.completed_tasks {
718 if taskid.starts_with("features_window_") && result.status == TaskStatus::Completed {
719 if let Some(data) = &result.data {
720 all_features.push(data.clone());
721
722 if let Some(window_str) = taskid.strip_prefix("features_window_") {
724 if let Ok(index) = window_str.parse::<usize>() {
725 window_indices.push(index);
726 }
727 }
728 }
729 }
730 }
731
732 if all_features.is_empty() {
733 return Ok(Array2::zeros((0, numfeatures)));
734 }
735
736 let mut indexed_features: Vec<(usize, Array1<F>)> =
738 window_indices.into_iter().zip(all_features).collect();
739 indexed_features.sort_by_key(|(index_, _)| *index_);
740
741 let num_windows = indexed_features.len();
743 let feature_size = indexed_features[0].1.len().min(numfeatures);
744 let mut result = Array2::zeros((num_windows, feature_size));
745
746 for (row, (_, features)) in indexed_features.iter().enumerate() {
747 for col in 0..feature_size {
748 if col < features.len() {
749 result[[row, col]] = features[col];
750 }
751 }
752 }
753
754 Ok(result)
755 }
756
757 pub fn get_cluster_status(&self) -> ClusterStatus {
759 let total_nodes = self.nodes.len();
760 let available_nodes = self
761 .nodes
762 .values()
763 .filter(|node| node.status == NodeStatus::Available)
764 .count();
765
766 let total_running_tasks = self.running_tasks.len();
767 let total_completed_tasks = self.completed_tasks.len();
768 let total_queued_tasks = self.task_queue.len();
769
770 let average_load = if total_nodes > 0 {
771 self.nodes
772 .values()
773 .map(|node| node.current_load)
774 .sum::<f64>()
775 / total_nodes as f64
776 } else {
777 0.0
778 };
779
780 ClusterStatus {
781 total_nodes,
782 available_nodes,
783 total_running_tasks,
784 total_completed_tasks,
785 total_queued_tasks,
786 average_load,
787 nodes: self.nodes.clone(),
788 }
789 }
790
791 pub fn clear_completed_tasks(&mut self) {
793 self.completed_tasks.clear();
794 }
795
796 pub fn cancel_task(&mut self, taskid: &str) -> Result<()> {
798 if let Some(_task) = self.running_tasks.remove(taskid) {
799 self.completed_tasks.insert(
801 taskid.to_string(),
802 TaskResult {
803 taskid: taskid.to_string(),
804 status: TaskStatus::Cancelled,
805 data: None,
806 metrics: TaskMetrics::default(),
807 error: Some("Task cancelled by user".to_string()),
808 },
809 );
810 Ok(())
811 } else {
812 Err(TimeSeriesError::InvalidInput(format!(
813 "Task {taskid} not found in running tasks"
814 )))
815 }
816 }
817}
818
819#[derive(Debug, Clone)]
821pub struct ClusterStatus {
822 pub total_nodes: usize,
824 pub available_nodes: usize,
826 pub total_running_tasks: usize,
828 pub total_completed_tasks: usize,
830 pub total_queued_tasks: usize,
832 pub average_load: f64,
834 pub nodes: HashMap<String, NodeInfo>,
836}
837
838#[allow(dead_code)]
840pub fn distributed_moving_average<
841 F: Float
842 + Debug
843 + Clone
844 + scirs2_core::numeric::FromPrimitive
845 + scirs2_core::numeric::Zero
846 + scirs2_core::ndarray::ScalarOperand,
847>(
848 processor: &mut DistributedProcessor<F>,
849 data: &Array1<F>,
850 window_size: usize,
851) -> Result<Array1<F>> {
852 let task = DistributedTask {
854 id: "moving_average".to_string(),
855 task_type: TaskType::Custom("moving_average".to_string()),
856 input_data: data.clone(),
857 parameters: {
858 let mut params = HashMap::new();
859 params.insert("window_size".to_string(), window_size as f64);
860 params
861 },
862 priority: TaskPriority::Normal,
863 dependencies: vec![],
864 };
865
866 processor.submit_task(task)?;
867 processor.process_pending_tasks()?;
868
869 if let Some(result) = processor.completed_tasks.get("moving_average") {
871 if let Some(data) = &result.data {
872 Ok(data.clone())
873 } else {
874 Err(TimeSeriesError::ComputationError(
875 "Moving average computation failed".to_string(),
876 ))
877 }
878 } else {
879 Err(TimeSeriesError::ComputationError(
880 "Moving average task not found".to_string(),
881 ))
882 }
883}
884
885#[cfg(test)]
886mod tests {
887 use super::*;
888
889 #[test]
890 fn test_cluster_config_default() {
891 let config = ClusterConfig::default();
892 assert_eq!(config.nodes.len(), 1);
893 assert_eq!(config.max_concurrent_tasks, 4);
894 assert_eq!(config.chunk_size, 10000);
895 }
896
897 #[test]
898 fn test_distributed_processor_creation() {
899 let config = ClusterConfig::default();
900 let processor: DistributedProcessor<f64> = DistributedProcessor::new(config);
901
902 assert_eq!(processor.nodes.len(), 1);
903 assert!(processor.task_queue.is_empty());
904 assert!(processor.running_tasks.is_empty());
905 assert!(processor.completed_tasks.is_empty());
906 }
907
908 #[test]
909 fn test_task_submission() {
910 let config = ClusterConfig::default();
911 let mut processor: DistributedProcessor<f64> = DistributedProcessor::new(config);
912
913 let task = DistributedTask {
914 id: "test_task".to_string(),
915 task_type: TaskType::Forecasting,
916 input_data: Array1::from_vec(vec![1.0, 2.0, 3.0, 4.0, 5.0]),
917 parameters: HashMap::new(),
918 priority: TaskPriority::Normal,
919 dependencies: vec![],
920 };
921
922 assert!(processor.submit_task(task).is_ok());
923 assert_eq!(processor.task_queue.len(), 1);
924 }
925
926 #[test]
927 fn test_task_priority_ordering() {
928 let config = ClusterConfig::default();
929 let mut processor: DistributedProcessor<f64> = DistributedProcessor::new(config);
930
931 let low_task = DistributedTask {
933 id: "low".to_string(),
934 task_type: TaskType::Forecasting,
935 input_data: Array1::zeros(10),
936 parameters: HashMap::new(),
937 priority: TaskPriority::Low,
938 dependencies: vec![],
939 };
940
941 let high_task = DistributedTask {
942 id: "high".to_string(),
943 task_type: TaskType::Forecasting,
944 input_data: Array1::zeros(10),
945 parameters: HashMap::new(),
946 priority: TaskPriority::High,
947 dependencies: vec![],
948 };
949
950 processor.submit_task(low_task).expect("Operation failed");
951 processor.submit_task(high_task).expect("Operation failed");
952
953 assert_eq!(processor.task_queue[0].priority, TaskPriority::High);
955 assert_eq!(processor.task_queue[1].priority, TaskPriority::Low);
956 }
957
958 #[test]
959 fn test_distributed_forecasting() {
960 let config = ClusterConfig::default();
961 let mut processor: DistributedProcessor<f64> = DistributedProcessor::new(config);
962
963 let data = Array1::from_vec((1..100).map(|x| x as f64).collect());
964 let horizon = 10;
965
966 let result = processor.distributed_forecast(&data, horizon, "linear");
967 assert!(result.is_ok());
968
969 let forecast = result.expect("Operation failed");
970 assert_eq!(forecast.len(), horizon);
971 }
972
973 #[test]
974 fn test_cluster_status() {
975 let config = ClusterConfig::default();
976 let processor: DistributedProcessor<f64> = DistributedProcessor::new(config);
977
978 let status = processor.get_cluster_status();
979 assert_eq!(status.total_nodes, 1);
980 assert_eq!(status.available_nodes, 1);
981 assert_eq!(status.total_running_tasks, 0);
982 assert_eq!(status.total_completed_tasks, 0);
983 assert_eq!(status.total_queued_tasks, 0);
984 }
985
986 #[test]
987 fn test_load_balancing_strategies() {
988 assert_ne!(
990 LoadBalancingStrategy::RoundRobin,
991 LoadBalancingStrategy::LoadBased
992 );
993 assert_ne!(
994 LoadBalancingStrategy::Random,
995 LoadBalancingStrategy::Weighted
996 );
997 }
998
999 #[test]
1000 fn test_task_status_enum() {
1001 assert_eq!(TaskStatus::Pending, TaskStatus::Pending);
1002 assert_ne!(TaskStatus::Running, TaskStatus::Completed);
1003 }
1004
1005 #[test]
1006 fn test_fault_tolerance_config() {
1007 let config = FaultToleranceConfig::default();
1008 assert_eq!(config.max_retries, 3);
1009 assert_eq!(config.replication_factor, 2);
1010 assert!(!config.enable_replication);
1011 }
1012}