scirs2_series/
distributed.rs

1//! Distributed computing support for time series processing
2//!
3//! This module provides infrastructure for distributing time series computations
4//! across multiple nodes, supporting both synchronous and asynchronous processing.
5
6use scirs2_core::ndarray::{Array1, Array2, Axis};
7use scirs2_core::numeric::Float;
8use std::collections::HashMap;
9use std::fmt::Debug;
10use std::time::{Duration, Instant};
11
12use crate::error::{Result, TimeSeriesError};
13use statrs::statistics::Statistics;
14
15/// Configuration for distributed computing cluster
16#[derive(Debug, Clone)]
17pub struct ClusterConfig {
18    /// List of worker node addresses
19    pub nodes: Vec<String>,
20    /// Maximum number of concurrent tasks per node
21    pub max_concurrent_tasks: usize,
22    /// Timeout for task execution
23    pub task_timeout: Duration,
24    /// Chunk size for data splitting
25    pub chunk_size: usize,
26    /// Load balancing strategy
27    pub load_balancing: LoadBalancingStrategy,
28    /// Fault tolerance settings
29    pub fault_tolerance: FaultToleranceConfig,
30}
31
32impl Default for ClusterConfig {
33    fn default() -> Self {
34        Self {
35            nodes: vec!["localhost:8080".to_string()],
36            max_concurrent_tasks: 4,
37            task_timeout: Duration::from_secs(30),
38            chunk_size: 10000,
39            load_balancing: LoadBalancingStrategy::RoundRobin,
40            fault_tolerance: FaultToleranceConfig::default(),
41        }
42    }
43}
44
45/// Load balancing strategies for task distribution
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47pub enum LoadBalancingStrategy {
48    /// Round-robin distribution across nodes
49    RoundRobin,
50    /// Distribute based on current node load
51    LoadBased,
52    /// Random distribution
53    Random,
54    /// Weighted distribution based on node capabilities
55    Weighted,
56}
57
58/// Fault tolerance configuration
59#[derive(Debug, Clone)]
60pub struct FaultToleranceConfig {
61    /// Maximum number of retry attempts
62    pub max_retries: usize,
63    /// Retry delay
64    pub retry_delay: Duration,
65    /// Whether to enable task replication
66    pub enable_replication: bool,
67    /// Replication factor
68    pub replication_factor: usize,
69    /// Node failure detection timeout
70    pub failure_detection_timeout: Duration,
71}
72
73impl Default for FaultToleranceConfig {
74    fn default() -> Self {
75        Self {
76            max_retries: 3,
77            retry_delay: Duration::from_millis(500),
78            enable_replication: false,
79            replication_factor: 2,
80            failure_detection_timeout: Duration::from_secs(10),
81        }
82    }
83}
84
85/// Task definition for distributed execution
86#[derive(Debug, Clone)]
87pub struct DistributedTask<F: Float> {
88    /// Unique task identifier
89    pub id: String,
90    /// Task type
91    pub task_type: TaskType,
92    /// Input data chunk
93    pub input_data: Array1<F>,
94    /// Additional parameters
95    pub parameters: HashMap<String, f64>,
96    /// Priority level
97    pub priority: TaskPriority,
98    /// Dependencies on other tasks
99    pub dependencies: Vec<String>,
100}
101
102/// Types of tasks that can be executed in distributed manner
103#[derive(Debug, Clone, PartialEq, Eq)]
104pub enum TaskType {
105    /// Time series forecasting
106    Forecasting,
107    /// Decomposition analysis
108    Decomposition,
109    /// Feature extraction
110    FeatureExtraction,
111    /// Anomaly detection
112    AnomalyDetection,
113    /// Cross-correlation computation
114    CrossCorrelation,
115    /// Fourier transform
116    FourierTransform,
117    /// Wavelet transform
118    WaveletTransform,
119    /// Custom user-defined task
120    Custom(String),
121}
122
123/// Task priority levels
124#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
125pub enum TaskPriority {
126    /// Low priority
127    Low = 1,
128    /// Normal priority
129    Normal = 2,
130    /// High priority
131    High = 3,
132    /// Critical priority
133    Critical = 4,
134}
135
136/// Result of a distributed task execution
137#[derive(Debug, Clone)]
138pub struct TaskResult<F: Float> {
139    /// Task identifier
140    pub taskid: String,
141    /// Execution status
142    pub status: TaskStatus,
143    /// Result data
144    pub data: Option<Array1<F>>,
145    /// Execution metrics
146    pub metrics: TaskMetrics,
147    /// Error information if failed
148    pub error: Option<String>,
149}
150
151/// Task execution status
152#[derive(Debug, Clone, Copy, PartialEq, Eq)]
153pub enum TaskStatus {
154    /// Task is pending execution
155    Pending,
156    /// Task is currently running
157    Running,
158    /// Task completed successfully
159    Completed,
160    /// Task failed
161    Failed,
162    /// Task was cancelled
163    Cancelled,
164    /// Task timed out
165    Timeout,
166}
167
168/// Metrics for task execution
169#[derive(Debug, Clone)]
170pub struct TaskMetrics {
171    /// Execution time
172    pub execution_time: Duration,
173    /// Node that executed the task
174    pub executed_on: String,
175    /// Memory usage during execution
176    pub memory_usage: usize,
177    /// CPU utilization
178    pub cpu_utilization: f64,
179    /// Network transfer time
180    pub network_time: Duration,
181}
182
183impl Default for TaskMetrics {
184    fn default() -> Self {
185        Self {
186            execution_time: Duration::ZERO,
187            executed_on: String::new(),
188            memory_usage: 0,
189            cpu_utilization: 0.0,
190            network_time: Duration::ZERO,
191        }
192    }
193}
194
195/// Node information in the cluster
196#[derive(Debug, Clone)]
197pub struct NodeInfo {
198    /// Node address
199    pub address: String,
200    /// Current status
201    pub status: NodeStatus,
202    /// Available CPU cores
203    pub cpu_cores: usize,
204    /// Total memory in bytes
205    pub total_memory: usize,
206    /// Available memory in bytes
207    pub available_memory: usize,
208    /// Current load (0.0 to 1.0)
209    pub current_load: f64,
210    /// Number of running tasks
211    pub running_tasks: usize,
212    /// Node capabilities
213    pub capabilities: Vec<String>,
214    /// Last heartbeat timestamp
215    pub last_heartbeat: Instant,
216}
217
218/// Node status in the cluster
219#[derive(Debug, Clone, Copy, PartialEq, Eq)]
220pub enum NodeStatus {
221    /// Node is available for tasks
222    Available,
223    /// Node is busy with tasks
224    Busy,
225    /// Node is offline
226    Offline,
227    /// Node is under maintenance
228    Maintenance,
229    /// Node has failed
230    Failed,
231}
232
233/// Distributed time series processor
234pub struct DistributedProcessor<
235    F: Float
236        + Debug
237        + Clone
238        + scirs2_core::numeric::FromPrimitive
239        + scirs2_core::numeric::Zero
240        + scirs2_core::ndarray::ScalarOperand,
241> {
242    /// Cluster configuration
243    config: ClusterConfig,
244    /// Node registry
245    nodes: HashMap<String, NodeInfo>,
246    /// Task queue
247    task_queue: Vec<DistributedTask<F>>,
248    /// Running tasks
249    running_tasks: HashMap<String, DistributedTask<F>>,
250    /// Completed tasks
251    completed_tasks: HashMap<String, TaskResult<F>>,
252    /// Load balancer state
253    load_balancer_state: LoadBalancerState,
254}
255
256/// Internal state for load balancer
257#[derive(Debug, Default)]
258struct LoadBalancerState {
259    /// Round-robin counter
260    round_robin_counter: usize,
261    /// Node weights for weighted distribution
262    #[allow(dead_code)]
263    node_weights: HashMap<String, f64>,
264    /// Node load history
265    #[allow(dead_code)]
266    load_history: HashMap<String, Vec<f64>>,
267}
268
269impl<
270        F: Float
271            + Debug
272            + Clone
273            + scirs2_core::numeric::FromPrimitive
274            + scirs2_core::numeric::Zero
275            + scirs2_core::ndarray::ScalarOperand,
276    > DistributedProcessor<F>
277{
278    /// Create a new distributed processor
279    pub fn new(config: ClusterConfig) -> Self {
280        let mut nodes = HashMap::new();
281
282        // Initialize node information
283        for address in &config.nodes {
284            nodes.insert(
285                address.clone(),
286                NodeInfo {
287                    address: address.clone(),
288                    status: NodeStatus::Available,
289                    cpu_cores: 4, // Default values - in practice would be detected
290                    total_memory: 8 * 1024 * 1024 * 1024, // 8GB
291                    available_memory: 6 * 1024 * 1024 * 1024, // 6GB
292                    current_load: 0.0,
293                    running_tasks: 0,
294                    capabilities: vec!["time_series".to_string(), "forecasting".to_string()],
295                    last_heartbeat: Instant::now(),
296                },
297            );
298        }
299
300        Self {
301            config,
302            nodes,
303            task_queue: Vec::new(),
304            running_tasks: HashMap::new(),
305            completed_tasks: HashMap::new(),
306            load_balancer_state: LoadBalancerState::default(),
307        }
308    }
309
310    /// Submit a task for distributed execution
311    pub fn submit_task(&mut self, task: DistributedTask<F>) -> Result<()> {
312        // Validate task dependencies
313        for dep_id in &task.dependencies {
314            if !self.completed_tasks.contains_key(dep_id)
315                && !self.running_tasks.contains_key(dep_id)
316            {
317                return Err(TimeSeriesError::InvalidInput(format!(
318                    "Dependency task {dep_id} not found"
319                )));
320            }
321        }
322
323        // Add to queue with priority ordering (higher priority first)
324        let insert_pos = self
325            .task_queue
326            .binary_search_by(|t| t.priority.cmp(&task.priority).reverse())
327            .unwrap_or_else(|pos| pos);
328
329        self.task_queue.insert(insert_pos, task);
330        Ok(())
331    }
332
333    /// Process distributed time series forecasting
334    pub fn distributed_forecast(
335        &mut self,
336        data: &Array1<F>,
337        horizon: usize,
338        method: &str,
339    ) -> Result<Array1<F>> {
340        // Split data into chunks for parallel processing
341        let chunk_size = self
342            .config
343            .chunk_size
344            .min(data.len() / self.config.nodes.len().max(1));
345        let chunks: Vec<Array1<F>> = data
346            .axis_chunks_iter(Axis(0), chunk_size)
347            .map(|chunk| chunk.to_owned())
348            .collect();
349
350        // Create forecasting tasks for each chunk
351        let mut tasks = Vec::new();
352        for (i, chunk) in chunks.iter().enumerate() {
353            let mut params = HashMap::new();
354            params.insert("horizon".to_string(), horizon as f64);
355            params.insert("chunk_index".to_string(), i as f64);
356
357            let task = DistributedTask {
358                id: format!("forecast_chunk_{i}"),
359                task_type: TaskType::Forecasting,
360                input_data: chunk.clone(),
361                parameters: params,
362                priority: TaskPriority::Normal,
363                dependencies: vec![],
364            };
365
366            tasks.push(task);
367        }
368
369        // Submit tasks
370        for task in tasks {
371            self.submit_task(task)?;
372        }
373
374        // Process tasks (simplified simulation)
375        self.process_pending_tasks()?;
376
377        // Aggregate results
378        self.aggregate_forecast_results(horizon)
379    }
380
381    /// Process distributed feature extraction
382    pub fn distributed_feature_extraction(
383        &mut self,
384        data: &Array1<F>,
385        features: &[String],
386    ) -> Result<Array2<F>> {
387        // Split data into overlapping windows for feature extraction
388        let window_size = 1000.min(data.len() / 2);
389        let overlap = window_size / 4;
390        let step = window_size - overlap;
391
392        let mut tasks = Vec::new();
393        let mut i = 0;
394        let mut start = 0;
395
396        while start + window_size <= data.len() {
397            let end = (start + window_size).min(data.len());
398            let window = data.slice(scirs2_core::ndarray::s![start..end]).to_owned();
399
400            let mut params = HashMap::new();
401            params.insert("window_index".to_string(), i as f64);
402            params.insert("window_size".to_string(), window_size as f64);
403
404            let task = DistributedTask {
405                id: format!("features_window_{i}"),
406                task_type: TaskType::FeatureExtraction,
407                input_data: window,
408                parameters: params,
409                priority: TaskPriority::Normal,
410                dependencies: vec![],
411            };
412
413            tasks.push(task);
414            start += step;
415            i += 1;
416        }
417
418        // Submit tasks
419        for task in tasks {
420            self.submit_task(task)?;
421        }
422
423        // Process tasks
424        self.process_pending_tasks()?;
425
426        // Aggregate feature results
427        self.aggregate_feature_results(features.len())
428    }
429
430    /// Select optimal node for task execution
431    fn select_node_for_task(&mut self, task: &DistributedTask<F>) -> Result<String> {
432        let available_nodes: Vec<&String> = self
433            .nodes
434            .iter()
435            .filter(|(_, info)| {
436                info.status == NodeStatus::Available
437                    && info.running_tasks < self.config.max_concurrent_tasks
438            })
439            .map(|(address, _)| address)
440            .collect();
441
442        if available_nodes.is_empty() {
443            return Err(TimeSeriesError::ComputationError(
444                "No available nodes for task execution".to_string(),
445            ));
446        }
447
448        let selected_node = match self.config.load_balancing {
449            LoadBalancingStrategy::RoundRobin => {
450                let index = self.load_balancer_state.round_robin_counter % available_nodes.len();
451                self.load_balancer_state.round_robin_counter += 1;
452                available_nodes[index].clone()
453            }
454            LoadBalancingStrategy::LoadBased => {
455                // Select node with lowest current load
456                available_nodes
457                    .iter()
458                    .min_by(|a, b| {
459                        let load_a = self.nodes.get(*a as &str).unwrap().current_load;
460                        let load_b = self.nodes.get(*b as &str).unwrap().current_load;
461                        load_a
462                            .partial_cmp(&load_b)
463                            .unwrap_or(std::cmp::Ordering::Equal)
464                    })
465                    .unwrap()
466                    .to_string()
467            }
468            LoadBalancingStrategy::Random => {
469                // Simple hash-based selection for deterministic behavior
470                let hash = task.id.len() % available_nodes.len();
471                available_nodes[hash].clone()
472            }
473            LoadBalancingStrategy::Weighted => {
474                // Select based on node weights (simplified implementation)
475                available_nodes[0].clone() // Fallback to first available
476            }
477        };
478
479        Ok(selected_node)
480    }
481
482    /// Process pending tasks in the queue
483    fn process_pending_tasks(&mut self) -> Result<()> {
484        while let Some(task) = self.task_queue.pop() {
485            // Check if dependencies are satisfied
486            let dependencies_satisfied = task.dependencies.iter().all(|dep_id| {
487                self.completed_tasks
488                    .get(dep_id)
489                    .map(|result| result.status == TaskStatus::Completed)
490                    .unwrap_or(false)
491            });
492
493            if !dependencies_satisfied {
494                // Put task back in queue
495                self.task_queue.push(task);
496                continue;
497            }
498
499            // Select node for execution
500            let node_address = self.select_node_for_task(&task)?;
501
502            // Simulate task execution
503            let result = self.execute_task_on_node(&task, &node_address)?;
504
505            // Store result
506            self.completed_tasks.insert(task.id.clone(), result);
507            self.running_tasks.remove(&task.id);
508        }
509
510        Ok(())
511    }
512
513    /// Execute a task on a specific node (simulated)
514    fn execute_task_on_node(
515        &mut self,
516        task: &DistributedTask<F>,
517        node_address: &str,
518    ) -> Result<TaskResult<F>> {
519        let start_time = Instant::now();
520
521        // Mark task as running
522        self.running_tasks.insert(task.id.clone(), task.clone());
523
524        // Update node status
525        if let Some(node) = self.nodes.get_mut(node_address) {
526            node.running_tasks += 1;
527            node.current_load = node.running_tasks as f64 / self.config.max_concurrent_tasks as f64;
528        }
529
530        // Simulate task execution based on task type
531        let result_data = match task.task_type {
532            TaskType::Forecasting => self.simulate_forecasting_task(task)?,
533            TaskType::FeatureExtraction => self.simulate_feature_extraction_task(task)?,
534            TaskType::AnomalyDetection => self.simulate_anomaly_detection_task(task)?,
535            TaskType::Decomposition => self.simulate_decomposition_task(task)?,
536            _ => {
537                // Generic processing
538                task.input_data.clone()
539            }
540        };
541
542        let execution_time = start_time.elapsed();
543
544        // Update node status
545        if let Some(node) = self.nodes.get_mut(node_address) {
546            node.running_tasks = node.running_tasks.saturating_sub(1);
547            node.current_load = node.running_tasks as f64 / self.config.max_concurrent_tasks as f64;
548        }
549
550        Ok(TaskResult {
551            taskid: task.id.clone(),
552            status: TaskStatus::Completed,
553            data: Some(result_data),
554            metrics: TaskMetrics {
555                execution_time,
556                executed_on: node_address.to_string(),
557                memory_usage: task.input_data.len() * std::mem::size_of::<F>(),
558                cpu_utilization: 0.8,                    // Simulated
559                network_time: Duration::from_millis(10), // Simulated
560            },
561            error: None,
562        })
563    }
564
565    /// Simulate forecasting task execution
566    fn simulate_forecasting_task(&self, task: &DistributedTask<F>) -> Result<Array1<F>> {
567        let horizon = task
568            .parameters
569            .get("horizon")
570            .map(|&h| h as usize)
571            .unwrap_or(10);
572
573        // Simple linear extrapolation for simulation
574        let data = &task.input_data;
575        if data.len() < 2 {
576            return Ok(Array1::zeros(horizon));
577        }
578
579        let slope = (data[data.len() - 1] - data[data.len() - 2]) / F::one();
580        let mut forecast = Array1::zeros(horizon);
581
582        for i in 0..horizon {
583            forecast[i] = data[data.len() - 1] + slope * F::from(i + 1).unwrap();
584        }
585
586        Ok(forecast)
587    }
588
589    /// Simulate feature extraction task execution
590    fn simulate_feature_extraction_task(&self, task: &DistributedTask<F>) -> Result<Array1<F>> {
591        let data = &task.input_data;
592
593        // Extract basic statistical features
594        let mean = data.mean().unwrap_or(F::zero());
595        let variance = data.var(F::zero());
596        let min = data.iter().fold(F::infinity(), |acc, &x| acc.min(x));
597        let max = data.iter().fold(F::neg_infinity(), |acc, &x| acc.max(x));
598
599        // Simulate more features
600        let features = vec![
601            mean,
602            variance.sqrt(), // Standard deviation
603            min,
604            max,
605            max - min, // Range
606        ];
607
608        Ok(Array1::from_vec(features))
609    }
610
611    /// Simulate anomaly detection task execution
612    fn simulate_anomaly_detection_task(&self, task: &DistributedTask<F>) -> Result<Array1<F>> {
613        let data = &task.input_data;
614        let mean = data.mean().unwrap_or(F::zero());
615        let std_dev = data.var(F::zero()).sqrt();
616
617        // Simple z-score based anomaly detection
618        let threshold = F::from(3.0).unwrap();
619        let mut anomaly_scores = Array1::zeros(data.len());
620
621        for (i, &value) in data.iter().enumerate() {
622            let z_score = (value - mean) / std_dev;
623            anomaly_scores[i] = if z_score.abs() > threshold {
624                F::one()
625            } else {
626                F::zero()
627            };
628        }
629
630        Ok(anomaly_scores)
631    }
632
633    /// Simulate decomposition task execution
634    fn simulate_decomposition_task(&self, task: &DistributedTask<F>) -> Result<Array1<F>> {
635        // Simple trend extraction using moving average
636        let data = &task.input_data;
637        let window_size = 10.min(data.len() / 2);
638        let mut trend = Array1::zeros(data.len());
639
640        for i in 0..data.len() {
641            let start = i.saturating_sub(window_size / 2);
642            let end = (i + window_size / 2 + 1).min(data.len());
643
644            let window_sum = data.slice(scirs2_core::ndarray::s![start..end]).sum();
645            let window_len = F::from(end - start).unwrap();
646            trend[i] = window_sum / window_len;
647        }
648
649        Ok(trend)
650    }
651
652    /// Aggregate forecasting results from multiple tasks
653    fn aggregate_forecast_results(&self, horizon: usize) -> Result<Array1<F>> {
654        let mut all_forecasts = Vec::new();
655        let mut chunk_indices = Vec::new();
656
657        // Collect all forecast results
658        for (taskid, result) in &self.completed_tasks {
659            if taskid.starts_with("forecast_chunk_") && result.status == TaskStatus::Completed {
660                if let Some(data) = &result.data {
661                    all_forecasts.push(data.clone());
662
663                    // Extract chunk index for proper ordering
664                    if let Some(chunk_str) = taskid.strip_prefix("forecast_chunk_") {
665                        if let Ok(index) = chunk_str.parse::<usize>() {
666                            chunk_indices.push(index);
667                        }
668                    }
669                }
670            }
671        }
672
673        if all_forecasts.is_empty() {
674            return Ok(Array1::zeros(horizon));
675        }
676
677        // Sort forecasts by chunk index
678        let mut indexed_forecasts: Vec<(usize, Array1<F>)> =
679            chunk_indices.into_iter().zip(all_forecasts).collect();
680        indexed_forecasts.sort_by_key(|(index_, _)| *index_);
681
682        // Aggregate by averaging (simple ensemble approach)
683        let mut final_forecast = Array1::zeros(horizon);
684        let mut count = 0;
685
686        for (_, forecast) in indexed_forecasts {
687            let actual_horizon = forecast.len().min(horizon);
688            for i in 0..actual_horizon {
689                final_forecast[i] = final_forecast[i] + forecast[i];
690            }
691            count += 1;
692        }
693
694        if count > 0 {
695            final_forecast = final_forecast / F::from(count).unwrap();
696        }
697
698        Ok(final_forecast)
699    }
700
701    /// Aggregate feature extraction results
702    fn aggregate_feature_results(&self, numfeatures: usize) -> Result<Array2<F>> {
703        let mut all_features = Vec::new();
704        let mut window_indices = Vec::new();
705
706        // Collect all feature results
707        for (taskid, result) in &self.completed_tasks {
708            if taskid.starts_with("features_window_") && result.status == TaskStatus::Completed {
709                if let Some(data) = &result.data {
710                    all_features.push(data.clone());
711
712                    // Extract window index
713                    if let Some(window_str) = taskid.strip_prefix("features_window_") {
714                        if let Ok(index) = window_str.parse::<usize>() {
715                            window_indices.push(index);
716                        }
717                    }
718                }
719            }
720        }
721
722        if all_features.is_empty() {
723            return Ok(Array2::zeros((0, numfeatures)));
724        }
725
726        // Sort by window index
727        let mut indexed_features: Vec<(usize, Array1<F>)> =
728            window_indices.into_iter().zip(all_features).collect();
729        indexed_features.sort_by_key(|(index_, _)| *index_);
730
731        // Combine into matrix
732        let num_windows = indexed_features.len();
733        let feature_size = indexed_features[0].1.len().min(numfeatures);
734        let mut result = Array2::zeros((num_windows, feature_size));
735
736        for (row, (_, features)) in indexed_features.iter().enumerate() {
737            for col in 0..feature_size {
738                if col < features.len() {
739                    result[[row, col]] = features[col];
740                }
741            }
742        }
743
744        Ok(result)
745    }
746
747    /// Get cluster status information
748    pub fn get_cluster_status(&self) -> ClusterStatus {
749        let total_nodes = self.nodes.len();
750        let available_nodes = self
751            .nodes
752            .values()
753            .filter(|node| node.status == NodeStatus::Available)
754            .count();
755
756        let total_running_tasks = self.running_tasks.len();
757        let total_completed_tasks = self.completed_tasks.len();
758        let total_queued_tasks = self.task_queue.len();
759
760        let average_load = if total_nodes > 0 {
761            self.nodes
762                .values()
763                .map(|node| node.current_load)
764                .sum::<f64>()
765                / total_nodes as f64
766        } else {
767            0.0
768        };
769
770        ClusterStatus {
771            total_nodes,
772            available_nodes,
773            total_running_tasks,
774            total_completed_tasks,
775            total_queued_tasks,
776            average_load,
777            nodes: self.nodes.clone(),
778        }
779    }
780
781    /// Clear completed tasks to free memory
782    pub fn clear_completed_tasks(&mut self) {
783        self.completed_tasks.clear();
784    }
785
786    /// Cancel a running task
787    pub fn cancel_task(&mut self, taskid: &str) -> Result<()> {
788        if let Some(_task) = self.running_tasks.remove(taskid) {
789            // Add cancelled result
790            self.completed_tasks.insert(
791                taskid.to_string(),
792                TaskResult {
793                    taskid: taskid.to_string(),
794                    status: TaskStatus::Cancelled,
795                    data: None,
796                    metrics: TaskMetrics::default(),
797                    error: Some("Task cancelled by user".to_string()),
798                },
799            );
800            Ok(())
801        } else {
802            Err(TimeSeriesError::InvalidInput(format!(
803                "Task {taskid} not found in running tasks"
804            )))
805        }
806    }
807}
808
809/// Cluster status information
810#[derive(Debug, Clone)]
811pub struct ClusterStatus {
812    /// Total number of nodes in cluster
813    pub total_nodes: usize,
814    /// Number of available nodes
815    pub available_nodes: usize,
816    /// Number of currently running tasks
817    pub total_running_tasks: usize,
818    /// Number of completed tasks
819    pub total_completed_tasks: usize,
820    /// Number of tasks in queue
821    pub total_queued_tasks: usize,
822    /// Average load across all nodes
823    pub average_load: f64,
824    /// Detailed node information
825    pub nodes: HashMap<String, NodeInfo>,
826}
827
828/// Convenience functions for common distributed operations
829#[allow(dead_code)]
830pub fn distributed_moving_average<
831    F: Float
832        + Debug
833        + Clone
834        + scirs2_core::numeric::FromPrimitive
835        + scirs2_core::numeric::Zero
836        + scirs2_core::ndarray::ScalarOperand,
837>(
838    processor: &mut DistributedProcessor<F>,
839    data: &Array1<F>,
840    window_size: usize,
841) -> Result<Array1<F>> {
842    // Create custom task for moving average computation
843    let task = DistributedTask {
844        id: "moving_average".to_string(),
845        task_type: TaskType::Custom("moving_average".to_string()),
846        input_data: data.clone(),
847        parameters: {
848            let mut params = HashMap::new();
849            params.insert("window_size".to_string(), window_size as f64);
850            params
851        },
852        priority: TaskPriority::Normal,
853        dependencies: vec![],
854    };
855
856    processor.submit_task(task)?;
857    processor.process_pending_tasks()?;
858
859    // Get result
860    if let Some(result) = processor.completed_tasks.get("moving_average") {
861        if let Some(data) = &result.data {
862            Ok(data.clone())
863        } else {
864            Err(TimeSeriesError::ComputationError(
865                "Moving average computation failed".to_string(),
866            ))
867        }
868    } else {
869        Err(TimeSeriesError::ComputationError(
870            "Moving average task not found".to_string(),
871        ))
872    }
873}
874
875#[cfg(test)]
876mod tests {
877    use super::*;
878
879    #[test]
880    fn test_cluster_config_default() {
881        let config = ClusterConfig::default();
882        assert_eq!(config.nodes.len(), 1);
883        assert_eq!(config.max_concurrent_tasks, 4);
884        assert_eq!(config.chunk_size, 10000);
885    }
886
887    #[test]
888    fn test_distributed_processor_creation() {
889        let config = ClusterConfig::default();
890        let processor: DistributedProcessor<f64> = DistributedProcessor::new(config);
891
892        assert_eq!(processor.nodes.len(), 1);
893        assert!(processor.task_queue.is_empty());
894        assert!(processor.running_tasks.is_empty());
895        assert!(processor.completed_tasks.is_empty());
896    }
897
898    #[test]
899    fn test_task_submission() {
900        let config = ClusterConfig::default();
901        let mut processor: DistributedProcessor<f64> = DistributedProcessor::new(config);
902
903        let task = DistributedTask {
904            id: "test_task".to_string(),
905            task_type: TaskType::Forecasting,
906            input_data: Array1::from_vec(vec![1.0, 2.0, 3.0, 4.0, 5.0]),
907            parameters: HashMap::new(),
908            priority: TaskPriority::Normal,
909            dependencies: vec![],
910        };
911
912        assert!(processor.submit_task(task).is_ok());
913        assert_eq!(processor.task_queue.len(), 1);
914    }
915
916    #[test]
917    fn test_task_priority_ordering() {
918        let config = ClusterConfig::default();
919        let mut processor: DistributedProcessor<f64> = DistributedProcessor::new(config);
920
921        // Submit tasks with different priorities
922        let low_task = DistributedTask {
923            id: "low".to_string(),
924            task_type: TaskType::Forecasting,
925            input_data: Array1::zeros(10),
926            parameters: HashMap::new(),
927            priority: TaskPriority::Low,
928            dependencies: vec![],
929        };
930
931        let high_task = DistributedTask {
932            id: "high".to_string(),
933            task_type: TaskType::Forecasting,
934            input_data: Array1::zeros(10),
935            parameters: HashMap::new(),
936            priority: TaskPriority::High,
937            dependencies: vec![],
938        };
939
940        processor.submit_task(low_task).unwrap();
941        processor.submit_task(high_task).unwrap();
942
943        // High priority task should be first
944        assert_eq!(processor.task_queue[0].priority, TaskPriority::High);
945        assert_eq!(processor.task_queue[1].priority, TaskPriority::Low);
946    }
947
948    #[test]
949    fn test_distributed_forecasting() {
950        let config = ClusterConfig::default();
951        let mut processor: DistributedProcessor<f64> = DistributedProcessor::new(config);
952
953        let data = Array1::from_vec((1..100).map(|x| x as f64).collect());
954        let horizon = 10;
955
956        let result = processor.distributed_forecast(&data, horizon, "linear");
957        assert!(result.is_ok());
958
959        let forecast = result.unwrap();
960        assert_eq!(forecast.len(), horizon);
961    }
962
963    #[test]
964    fn test_cluster_status() {
965        let config = ClusterConfig::default();
966        let processor: DistributedProcessor<f64> = DistributedProcessor::new(config);
967
968        let status = processor.get_cluster_status();
969        assert_eq!(status.total_nodes, 1);
970        assert_eq!(status.available_nodes, 1);
971        assert_eq!(status.total_running_tasks, 0);
972        assert_eq!(status.total_completed_tasks, 0);
973        assert_eq!(status.total_queued_tasks, 0);
974    }
975
976    #[test]
977    fn test_load_balancing_strategies() {
978        // Test that different strategies are properly defined
979        assert_ne!(
980            LoadBalancingStrategy::RoundRobin,
981            LoadBalancingStrategy::LoadBased
982        );
983        assert_ne!(
984            LoadBalancingStrategy::Random,
985            LoadBalancingStrategy::Weighted
986        );
987    }
988
989    #[test]
990    fn test_task_status_enum() {
991        assert_eq!(TaskStatus::Pending, TaskStatus::Pending);
992        assert_ne!(TaskStatus::Running, TaskStatus::Completed);
993    }
994
995    #[test]
996    fn test_fault_tolerance_config() {
997        let config = FaultToleranceConfig::default();
998        assert_eq!(config.max_retries, 3);
999        assert_eq!(config.replication_factor, 2);
1000        assert!(!config.enable_replication);
1001    }
1002}