Skip to main content

scirs2_series/
distributed.rs

1//! Distributed computing support for time series processing
2//!
3//! This module provides infrastructure for distributing time series computations
4//! across multiple nodes, supporting both synchronous and asynchronous processing.
5
6use scirs2_core::ndarray::ArrayStatCompat;
7use scirs2_core::ndarray::{Array1, Array2, Axis};
8use scirs2_core::numeric::Float;
9use std::collections::HashMap;
10use std::fmt::Debug;
11use std::time::{Duration, Instant};
12
13use crate::error::{Result, TimeSeriesError};
14use statrs::statistics::Statistics;
15
16/// Configuration for distributed computing cluster
17#[derive(Debug, Clone)]
18pub struct ClusterConfig {
19    /// List of worker node addresses
20    pub nodes: Vec<String>,
21    /// Maximum number of concurrent tasks per node
22    pub max_concurrent_tasks: usize,
23    /// Timeout for task execution
24    pub task_timeout: Duration,
25    /// Chunk size for data splitting
26    pub chunk_size: usize,
27    /// Load balancing strategy
28    pub load_balancing: LoadBalancingStrategy,
29    /// Fault tolerance settings
30    pub fault_tolerance: FaultToleranceConfig,
31}
32
33impl Default for ClusterConfig {
34    fn default() -> Self {
35        Self {
36            nodes: vec!["localhost:8080".to_string()],
37            max_concurrent_tasks: 4,
38            task_timeout: Duration::from_secs(30),
39            chunk_size: 10000,
40            load_balancing: LoadBalancingStrategy::RoundRobin,
41            fault_tolerance: FaultToleranceConfig::default(),
42        }
43    }
44}
45
46/// Load balancing strategies for task distribution
47#[derive(Debug, Clone, Copy, PartialEq, Eq)]
48pub enum LoadBalancingStrategy {
49    /// Round-robin distribution across nodes
50    RoundRobin,
51    /// Distribute based on current node load
52    LoadBased,
53    /// Random distribution
54    Random,
55    /// Weighted distribution based on node capabilities
56    Weighted,
57}
58
59/// Fault tolerance configuration
60#[derive(Debug, Clone)]
61pub struct FaultToleranceConfig {
62    /// Maximum number of retry attempts
63    pub max_retries: usize,
64    /// Retry delay
65    pub retry_delay: Duration,
66    /// Whether to enable task replication
67    pub enable_replication: bool,
68    /// Replication factor
69    pub replication_factor: usize,
70    /// Node failure detection timeout
71    pub failure_detection_timeout: Duration,
72}
73
74impl Default for FaultToleranceConfig {
75    fn default() -> Self {
76        Self {
77            max_retries: 3,
78            retry_delay: Duration::from_millis(500),
79            enable_replication: false,
80            replication_factor: 2,
81            failure_detection_timeout: Duration::from_secs(10),
82        }
83    }
84}
85
86/// Task definition for distributed execution
87#[derive(Debug, Clone)]
88pub struct DistributedTask<F: Float> {
89    /// Unique task identifier
90    pub id: String,
91    /// Task type
92    pub task_type: TaskType,
93    /// Input data chunk
94    pub input_data: Array1<F>,
95    /// Additional parameters
96    pub parameters: HashMap<String, f64>,
97    /// Priority level
98    pub priority: TaskPriority,
99    /// Dependencies on other tasks
100    pub dependencies: Vec<String>,
101}
102
103/// Types of tasks that can be executed in distributed manner
104#[derive(Debug, Clone, PartialEq, Eq)]
105pub enum TaskType {
106    /// Time series forecasting
107    Forecasting,
108    /// Decomposition analysis
109    Decomposition,
110    /// Feature extraction
111    FeatureExtraction,
112    /// Anomaly detection
113    AnomalyDetection,
114    /// Cross-correlation computation
115    CrossCorrelation,
116    /// Fourier transform
117    FourierTransform,
118    /// Wavelet transform
119    WaveletTransform,
120    /// Custom user-defined task
121    Custom(String),
122}
123
124/// Task priority levels
125#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
126pub enum TaskPriority {
127    /// Low priority
128    Low = 1,
129    /// Normal priority
130    Normal = 2,
131    /// High priority
132    High = 3,
133    /// Critical priority
134    Critical = 4,
135}
136
137/// Result of a distributed task execution
138#[derive(Debug, Clone)]
139pub struct TaskResult<F: Float> {
140    /// Task identifier
141    pub taskid: String,
142    /// Execution status
143    pub status: TaskStatus,
144    /// Result data
145    pub data: Option<Array1<F>>,
146    /// Execution metrics
147    pub metrics: TaskMetrics,
148    /// Error information if failed
149    pub error: Option<String>,
150}
151
152/// Task execution status
153#[derive(Debug, Clone, Copy, PartialEq, Eq)]
154pub enum TaskStatus {
155    /// Task is pending execution
156    Pending,
157    /// Task is currently running
158    Running,
159    /// Task completed successfully
160    Completed,
161    /// Task failed
162    Failed,
163    /// Task was cancelled
164    Cancelled,
165    /// Task timed out
166    Timeout,
167}
168
169/// Metrics for task execution
170#[derive(Debug, Clone)]
171pub struct TaskMetrics {
172    /// Execution time
173    pub execution_time: Duration,
174    /// Node that executed the task
175    pub executed_on: String,
176    /// Memory usage during execution
177    pub memory_usage: usize,
178    /// CPU utilization
179    pub cpu_utilization: f64,
180    /// Network transfer time
181    pub network_time: Duration,
182}
183
184impl Default for TaskMetrics {
185    fn default() -> Self {
186        Self {
187            execution_time: Duration::ZERO,
188            executed_on: String::new(),
189            memory_usage: 0,
190            cpu_utilization: 0.0,
191            network_time: Duration::ZERO,
192        }
193    }
194}
195
196/// Node information in the cluster
197#[derive(Debug, Clone)]
198pub struct NodeInfo {
199    /// Node address
200    pub address: String,
201    /// Current status
202    pub status: NodeStatus,
203    /// Available CPU cores
204    pub cpu_cores: usize,
205    /// Total memory in bytes
206    pub total_memory: usize,
207    /// Available memory in bytes
208    pub available_memory: usize,
209    /// Current load (0.0 to 1.0)
210    pub current_load: f64,
211    /// Number of running tasks
212    pub running_tasks: usize,
213    /// Node capabilities
214    pub capabilities: Vec<String>,
215    /// Last heartbeat timestamp
216    pub last_heartbeat: Instant,
217}
218
219/// Node status in the cluster
220#[derive(Debug, Clone, Copy, PartialEq, Eq)]
221pub enum NodeStatus {
222    /// Node is available for tasks
223    Available,
224    /// Node is busy with tasks
225    Busy,
226    /// Node is offline
227    Offline,
228    /// Node is under maintenance
229    Maintenance,
230    /// Node has failed
231    Failed,
232}
233
234/// Distributed time series processor
235pub struct DistributedProcessor<
236    F: Float
237        + Debug
238        + Clone
239        + scirs2_core::numeric::FromPrimitive
240        + scirs2_core::numeric::Zero
241        + scirs2_core::ndarray::ScalarOperand,
242> {
243    /// Cluster configuration
244    config: ClusterConfig,
245    /// Node registry
246    nodes: HashMap<String, NodeInfo>,
247    /// Task queue
248    task_queue: Vec<DistributedTask<F>>,
249    /// Running tasks
250    running_tasks: HashMap<String, DistributedTask<F>>,
251    /// Completed tasks
252    completed_tasks: HashMap<String, TaskResult<F>>,
253    /// Load balancer state
254    load_balancer_state: LoadBalancerState,
255}
256
257/// Internal state for load balancer
258#[derive(Debug, Default)]
259struct LoadBalancerState {
260    /// Round-robin counter
261    round_robin_counter: usize,
262    /// Node weights for weighted distribution
263    #[allow(dead_code)]
264    node_weights: HashMap<String, f64>,
265    /// Node load history
266    #[allow(dead_code)]
267    load_history: HashMap<String, Vec<f64>>,
268}
269
270impl<
271        F: Float
272            + Debug
273            + Clone
274            + scirs2_core::numeric::FromPrimitive
275            + scirs2_core::numeric::Zero
276            + scirs2_core::ndarray::ScalarOperand,
277    > DistributedProcessor<F>
278{
279    /// Create a new distributed processor
280    pub fn new(config: ClusterConfig) -> Self {
281        let mut nodes = HashMap::new();
282
283        // Initialize node information
284        for address in &config.nodes {
285            nodes.insert(
286                address.clone(),
287                NodeInfo {
288                    address: address.clone(),
289                    status: NodeStatus::Available,
290                    cpu_cores: 4, // Default values - in practice would be detected
291                    total_memory: 8 * 1024 * 1024 * 1024, // 8GB
292                    available_memory: 6 * 1024 * 1024 * 1024, // 6GB
293                    current_load: 0.0,
294                    running_tasks: 0,
295                    capabilities: vec!["time_series".to_string(), "forecasting".to_string()],
296                    last_heartbeat: Instant::now(),
297                },
298            );
299        }
300
301        Self {
302            config,
303            nodes,
304            task_queue: Vec::new(),
305            running_tasks: HashMap::new(),
306            completed_tasks: HashMap::new(),
307            load_balancer_state: LoadBalancerState::default(),
308        }
309    }
310
311    /// Submit a task for distributed execution
312    pub fn submit_task(&mut self, task: DistributedTask<F>) -> Result<()> {
313        // Validate task dependencies
314        for dep_id in &task.dependencies {
315            if !self.completed_tasks.contains_key(dep_id)
316                && !self.running_tasks.contains_key(dep_id)
317            {
318                return Err(TimeSeriesError::InvalidInput(format!(
319                    "Dependency task {dep_id} not found"
320                )));
321            }
322        }
323
324        // Add to queue with priority ordering (higher priority first)
325        let insert_pos = self
326            .task_queue
327            .binary_search_by(|t| t.priority.cmp(&task.priority).reverse())
328            .unwrap_or_else(|pos| pos);
329
330        self.task_queue.insert(insert_pos, task);
331        Ok(())
332    }
333
334    /// Process distributed time series forecasting
335    pub fn distributed_forecast(
336        &mut self,
337        data: &Array1<F>,
338        horizon: usize,
339        method: &str,
340    ) -> Result<Array1<F>> {
341        // Split data into chunks for parallel processing
342        let chunk_size = self
343            .config
344            .chunk_size
345            .min(data.len() / self.config.nodes.len().max(1));
346        let chunks: Vec<Array1<F>> = data
347            .axis_chunks_iter(Axis(0), chunk_size)
348            .map(|chunk| chunk.to_owned())
349            .collect();
350
351        // Create forecasting tasks for each chunk
352        let mut tasks = Vec::new();
353        for (i, chunk) in chunks.iter().enumerate() {
354            let mut params = HashMap::new();
355            params.insert("horizon".to_string(), horizon as f64);
356            params.insert("chunk_index".to_string(), i as f64);
357
358            let task = DistributedTask {
359                id: format!("forecast_chunk_{i}"),
360                task_type: TaskType::Forecasting,
361                input_data: chunk.clone(),
362                parameters: params,
363                priority: TaskPriority::Normal,
364                dependencies: vec![],
365            };
366
367            tasks.push(task);
368        }
369
370        // Submit tasks
371        for task in tasks {
372            self.submit_task(task)?;
373        }
374
375        // Process tasks (simplified simulation)
376        self.process_pending_tasks()?;
377
378        // Aggregate results
379        self.aggregate_forecast_results(horizon)
380    }
381
382    /// Process distributed feature extraction
383    pub fn distributed_feature_extraction(
384        &mut self,
385        data: &Array1<F>,
386        features: &[String],
387    ) -> Result<Array2<F>> {
388        // Split data into overlapping windows for feature extraction
389        let window_size = 1000.min(data.len() / 2);
390        let overlap = window_size / 4;
391        let step = window_size - overlap;
392
393        let mut tasks = Vec::new();
394        let mut i = 0;
395        let mut start = 0;
396
397        while start + window_size <= data.len() {
398            let end = (start + window_size).min(data.len());
399            let window = data.slice(scirs2_core::ndarray::s![start..end]).to_owned();
400
401            let mut params = HashMap::new();
402            params.insert("window_index".to_string(), i as f64);
403            params.insert("window_size".to_string(), window_size as f64);
404
405            let task = DistributedTask {
406                id: format!("features_window_{i}"),
407                task_type: TaskType::FeatureExtraction,
408                input_data: window,
409                parameters: params,
410                priority: TaskPriority::Normal,
411                dependencies: vec![],
412            };
413
414            tasks.push(task);
415            start += step;
416            i += 1;
417        }
418
419        // Submit tasks
420        for task in tasks {
421            self.submit_task(task)?;
422        }
423
424        // Process tasks
425        self.process_pending_tasks()?;
426
427        // Aggregate feature results
428        self.aggregate_feature_results(features.len())
429    }
430
431    /// Select optimal node for task execution
432    fn select_node_for_task(&mut self, task: &DistributedTask<F>) -> Result<String> {
433        let available_nodes: Vec<&String> = self
434            .nodes
435            .iter()
436            .filter(|(_, info)| {
437                info.status == NodeStatus::Available
438                    && info.running_tasks < self.config.max_concurrent_tasks
439            })
440            .map(|(address, _)| address)
441            .collect();
442
443        if available_nodes.is_empty() {
444            return Err(TimeSeriesError::ComputationError(
445                "No available nodes for task execution".to_string(),
446            ));
447        }
448
449        let selected_node = match self.config.load_balancing {
450            LoadBalancingStrategy::RoundRobin => {
451                let index = self.load_balancer_state.round_robin_counter % available_nodes.len();
452                self.load_balancer_state.round_robin_counter += 1;
453                available_nodes[index].clone()
454            }
455            LoadBalancingStrategy::LoadBased => {
456                // Select node with lowest current load
457                available_nodes
458                    .iter()
459                    .min_by(|a, b| {
460                        let load_a = self
461                            .nodes
462                            .get(*a as &str)
463                            .expect("Operation failed")
464                            .current_load;
465                        let load_b = self
466                            .nodes
467                            .get(*b as &str)
468                            .expect("Operation failed")
469                            .current_load;
470                        load_a
471                            .partial_cmp(&load_b)
472                            .unwrap_or(std::cmp::Ordering::Equal)
473                    })
474                    .expect("Operation failed")
475                    .to_string()
476            }
477            LoadBalancingStrategy::Random => {
478                // Simple hash-based selection for deterministic behavior
479                let hash = task.id.len() % available_nodes.len();
480                available_nodes[hash].clone()
481            }
482            LoadBalancingStrategy::Weighted => {
483                // Select based on node weights (simplified implementation)
484                available_nodes[0].clone() // Fallback to first available
485            }
486        };
487
488        Ok(selected_node)
489    }
490
491    /// Process pending tasks in the queue
492    fn process_pending_tasks(&mut self) -> Result<()> {
493        while let Some(task) = self.task_queue.pop() {
494            // Check if dependencies are satisfied
495            let dependencies_satisfied = task.dependencies.iter().all(|dep_id| {
496                self.completed_tasks
497                    .get(dep_id)
498                    .map(|result| result.status == TaskStatus::Completed)
499                    .unwrap_or(false)
500            });
501
502            if !dependencies_satisfied {
503                // Put task back in queue
504                self.task_queue.push(task);
505                continue;
506            }
507
508            // Select node for execution
509            let node_address = self.select_node_for_task(&task)?;
510
511            // Simulate task execution
512            let result = self.execute_task_on_node(&task, &node_address)?;
513
514            // Store result
515            self.completed_tasks.insert(task.id.clone(), result);
516            self.running_tasks.remove(&task.id);
517        }
518
519        Ok(())
520    }
521
522    /// Execute a task on a specific node (simulated)
523    fn execute_task_on_node(
524        &mut self,
525        task: &DistributedTask<F>,
526        node_address: &str,
527    ) -> Result<TaskResult<F>> {
528        let start_time = Instant::now();
529
530        // Mark task as running
531        self.running_tasks.insert(task.id.clone(), task.clone());
532
533        // Update node status
534        if let Some(node) = self.nodes.get_mut(node_address) {
535            node.running_tasks += 1;
536            node.current_load = node.running_tasks as f64 / self.config.max_concurrent_tasks as f64;
537        }
538
539        // Simulate task execution based on task type
540        let result_data = match task.task_type {
541            TaskType::Forecasting => self.simulate_forecasting_task(task)?,
542            TaskType::FeatureExtraction => self.simulate_feature_extraction_task(task)?,
543            TaskType::AnomalyDetection => self.simulate_anomaly_detection_task(task)?,
544            TaskType::Decomposition => self.simulate_decomposition_task(task)?,
545            _ => {
546                // Generic processing
547                task.input_data.clone()
548            }
549        };
550
551        let execution_time = start_time.elapsed();
552
553        // Update node status
554        if let Some(node) = self.nodes.get_mut(node_address) {
555            node.running_tasks = node.running_tasks.saturating_sub(1);
556            node.current_load = node.running_tasks as f64 / self.config.max_concurrent_tasks as f64;
557        }
558
559        Ok(TaskResult {
560            taskid: task.id.clone(),
561            status: TaskStatus::Completed,
562            data: Some(result_data),
563            metrics: TaskMetrics {
564                execution_time,
565                executed_on: node_address.to_string(),
566                memory_usage: task.input_data.len() * std::mem::size_of::<F>(),
567                cpu_utilization: 0.8,                    // Simulated
568                network_time: Duration::from_millis(10), // Simulated
569            },
570            error: None,
571        })
572    }
573
574    /// Simulate forecasting task execution
575    fn simulate_forecasting_task(&self, task: &DistributedTask<F>) -> Result<Array1<F>> {
576        let horizon = task
577            .parameters
578            .get("horizon")
579            .map(|&h| h as usize)
580            .unwrap_or(10);
581
582        // Simple linear extrapolation for simulation
583        let data = &task.input_data;
584        if data.len() < 2 {
585            return Ok(Array1::zeros(horizon));
586        }
587
588        let slope = (data[data.len() - 1] - data[data.len() - 2]) / F::one();
589        let mut forecast = Array1::zeros(horizon);
590
591        for i in 0..horizon {
592            forecast[i] =
593                data[data.len() - 1] + slope * F::from(i + 1).expect("Failed to convert to float");
594        }
595
596        Ok(forecast)
597    }
598
599    /// Simulate feature extraction task execution
600    fn simulate_feature_extraction_task(&self, task: &DistributedTask<F>) -> Result<Array1<F>> {
601        let data = &task.input_data;
602
603        // Extract basic statistical features
604        let mean = data.mean_or(F::zero());
605        let variance = data.var(F::zero());
606        let min = data.iter().fold(F::infinity(), |acc, &x| acc.min(x));
607        let max = data.iter().fold(F::neg_infinity(), |acc, &x| acc.max(x));
608
609        // Simulate more features
610        let features = vec![
611            mean,
612            variance.sqrt(), // Standard deviation
613            min,
614            max,
615            max - min, // Range
616        ];
617
618        Ok(Array1::from_vec(features))
619    }
620
621    /// Simulate anomaly detection task execution
622    fn simulate_anomaly_detection_task(&self, task: &DistributedTask<F>) -> Result<Array1<F>> {
623        let data = &task.input_data;
624        let mean = data.mean_or(F::zero());
625        let std_dev = data.var(F::zero()).sqrt();
626
627        // Simple z-score based anomaly detection
628        let threshold = F::from(3.0).expect("Failed to convert constant to float");
629        let mut anomaly_scores = Array1::zeros(data.len());
630
631        for (i, &value) in data.iter().enumerate() {
632            let z_score = (value - mean) / std_dev;
633            anomaly_scores[i] = if z_score.abs() > threshold {
634                F::one()
635            } else {
636                F::zero()
637            };
638        }
639
640        Ok(anomaly_scores)
641    }
642
643    /// Simulate decomposition task execution
644    fn simulate_decomposition_task(&self, task: &DistributedTask<F>) -> Result<Array1<F>> {
645        // Simple trend extraction using moving average
646        let data = &task.input_data;
647        let window_size = 10.min(data.len() / 2);
648        let mut trend = Array1::zeros(data.len());
649
650        for i in 0..data.len() {
651            let start = i.saturating_sub(window_size / 2);
652            let end = (i + window_size / 2 + 1).min(data.len());
653
654            let window_sum = data.slice(scirs2_core::ndarray::s![start..end]).sum();
655            let window_len = F::from(end - start).expect("Failed to convert to float");
656            trend[i] = window_sum / window_len;
657        }
658
659        Ok(trend)
660    }
661
662    /// Aggregate forecasting results from multiple tasks
663    fn aggregate_forecast_results(&self, horizon: usize) -> Result<Array1<F>> {
664        let mut all_forecasts = Vec::new();
665        let mut chunk_indices = Vec::new();
666
667        // Collect all forecast results
668        for (taskid, result) in &self.completed_tasks {
669            if taskid.starts_with("forecast_chunk_") && result.status == TaskStatus::Completed {
670                if let Some(data) = &result.data {
671                    all_forecasts.push(data.clone());
672
673                    // Extract chunk index for proper ordering
674                    if let Some(chunk_str) = taskid.strip_prefix("forecast_chunk_") {
675                        if let Ok(index) = chunk_str.parse::<usize>() {
676                            chunk_indices.push(index);
677                        }
678                    }
679                }
680            }
681        }
682
683        if all_forecasts.is_empty() {
684            return Ok(Array1::zeros(horizon));
685        }
686
687        // Sort forecasts by chunk index
688        let mut indexed_forecasts: Vec<(usize, Array1<F>)> =
689            chunk_indices.into_iter().zip(all_forecasts).collect();
690        indexed_forecasts.sort_by_key(|(index_, _)| *index_);
691
692        // Aggregate by averaging (simple ensemble approach)
693        let mut final_forecast = Array1::zeros(horizon);
694        let mut count = 0;
695
696        for (_, forecast) in indexed_forecasts {
697            let actual_horizon = forecast.len().min(horizon);
698            for i in 0..actual_horizon {
699                final_forecast[i] = final_forecast[i] + forecast[i];
700            }
701            count += 1;
702        }
703
704        if count > 0 {
705            final_forecast = final_forecast / F::from(count).expect("Failed to convert to float");
706        }
707
708        Ok(final_forecast)
709    }
710
711    /// Aggregate feature extraction results
712    fn aggregate_feature_results(&self, numfeatures: usize) -> Result<Array2<F>> {
713        let mut all_features = Vec::new();
714        let mut window_indices = Vec::new();
715
716        // Collect all feature results
717        for (taskid, result) in &self.completed_tasks {
718            if taskid.starts_with("features_window_") && result.status == TaskStatus::Completed {
719                if let Some(data) = &result.data {
720                    all_features.push(data.clone());
721
722                    // Extract window index
723                    if let Some(window_str) = taskid.strip_prefix("features_window_") {
724                        if let Ok(index) = window_str.parse::<usize>() {
725                            window_indices.push(index);
726                        }
727                    }
728                }
729            }
730        }
731
732        if all_features.is_empty() {
733            return Ok(Array2::zeros((0, numfeatures)));
734        }
735
736        // Sort by window index
737        let mut indexed_features: Vec<(usize, Array1<F>)> =
738            window_indices.into_iter().zip(all_features).collect();
739        indexed_features.sort_by_key(|(index_, _)| *index_);
740
741        // Combine into matrix
742        let num_windows = indexed_features.len();
743        let feature_size = indexed_features[0].1.len().min(numfeatures);
744        let mut result = Array2::zeros((num_windows, feature_size));
745
746        for (row, (_, features)) in indexed_features.iter().enumerate() {
747            for col in 0..feature_size {
748                if col < features.len() {
749                    result[[row, col]] = features[col];
750                }
751            }
752        }
753
754        Ok(result)
755    }
756
757    /// Get cluster status information
758    pub fn get_cluster_status(&self) -> ClusterStatus {
759        let total_nodes = self.nodes.len();
760        let available_nodes = self
761            .nodes
762            .values()
763            .filter(|node| node.status == NodeStatus::Available)
764            .count();
765
766        let total_running_tasks = self.running_tasks.len();
767        let total_completed_tasks = self.completed_tasks.len();
768        let total_queued_tasks = self.task_queue.len();
769
770        let average_load = if total_nodes > 0 {
771            self.nodes
772                .values()
773                .map(|node| node.current_load)
774                .sum::<f64>()
775                / total_nodes as f64
776        } else {
777            0.0
778        };
779
780        ClusterStatus {
781            total_nodes,
782            available_nodes,
783            total_running_tasks,
784            total_completed_tasks,
785            total_queued_tasks,
786            average_load,
787            nodes: self.nodes.clone(),
788        }
789    }
790
791    /// Clear completed tasks to free memory
792    pub fn clear_completed_tasks(&mut self) {
793        self.completed_tasks.clear();
794    }
795
796    /// Cancel a running task
797    pub fn cancel_task(&mut self, taskid: &str) -> Result<()> {
798        if let Some(_task) = self.running_tasks.remove(taskid) {
799            // Add cancelled result
800            self.completed_tasks.insert(
801                taskid.to_string(),
802                TaskResult {
803                    taskid: taskid.to_string(),
804                    status: TaskStatus::Cancelled,
805                    data: None,
806                    metrics: TaskMetrics::default(),
807                    error: Some("Task cancelled by user".to_string()),
808                },
809            );
810            Ok(())
811        } else {
812            Err(TimeSeriesError::InvalidInput(format!(
813                "Task {taskid} not found in running tasks"
814            )))
815        }
816    }
817}
818
819/// Cluster status information
820#[derive(Debug, Clone)]
821pub struct ClusterStatus {
822    /// Total number of nodes in cluster
823    pub total_nodes: usize,
824    /// Number of available nodes
825    pub available_nodes: usize,
826    /// Number of currently running tasks
827    pub total_running_tasks: usize,
828    /// Number of completed tasks
829    pub total_completed_tasks: usize,
830    /// Number of tasks in queue
831    pub total_queued_tasks: usize,
832    /// Average load across all nodes
833    pub average_load: f64,
834    /// Detailed node information
835    pub nodes: HashMap<String, NodeInfo>,
836}
837
838/// Convenience functions for common distributed operations
839#[allow(dead_code)]
840pub fn distributed_moving_average<
841    F: Float
842        + Debug
843        + Clone
844        + scirs2_core::numeric::FromPrimitive
845        + scirs2_core::numeric::Zero
846        + scirs2_core::ndarray::ScalarOperand,
847>(
848    processor: &mut DistributedProcessor<F>,
849    data: &Array1<F>,
850    window_size: usize,
851) -> Result<Array1<F>> {
852    // Create custom task for moving average computation
853    let task = DistributedTask {
854        id: "moving_average".to_string(),
855        task_type: TaskType::Custom("moving_average".to_string()),
856        input_data: data.clone(),
857        parameters: {
858            let mut params = HashMap::new();
859            params.insert("window_size".to_string(), window_size as f64);
860            params
861        },
862        priority: TaskPriority::Normal,
863        dependencies: vec![],
864    };
865
866    processor.submit_task(task)?;
867    processor.process_pending_tasks()?;
868
869    // Get result
870    if let Some(result) = processor.completed_tasks.get("moving_average") {
871        if let Some(data) = &result.data {
872            Ok(data.clone())
873        } else {
874            Err(TimeSeriesError::ComputationError(
875                "Moving average computation failed".to_string(),
876            ))
877        }
878    } else {
879        Err(TimeSeriesError::ComputationError(
880            "Moving average task not found".to_string(),
881        ))
882    }
883}
884
885#[cfg(test)]
886mod tests {
887    use super::*;
888
889    #[test]
890    fn test_cluster_config_default() {
891        let config = ClusterConfig::default();
892        assert_eq!(config.nodes.len(), 1);
893        assert_eq!(config.max_concurrent_tasks, 4);
894        assert_eq!(config.chunk_size, 10000);
895    }
896
897    #[test]
898    fn test_distributed_processor_creation() {
899        let config = ClusterConfig::default();
900        let processor: DistributedProcessor<f64> = DistributedProcessor::new(config);
901
902        assert_eq!(processor.nodes.len(), 1);
903        assert!(processor.task_queue.is_empty());
904        assert!(processor.running_tasks.is_empty());
905        assert!(processor.completed_tasks.is_empty());
906    }
907
908    #[test]
909    fn test_task_submission() {
910        let config = ClusterConfig::default();
911        let mut processor: DistributedProcessor<f64> = DistributedProcessor::new(config);
912
913        let task = DistributedTask {
914            id: "test_task".to_string(),
915            task_type: TaskType::Forecasting,
916            input_data: Array1::from_vec(vec![1.0, 2.0, 3.0, 4.0, 5.0]),
917            parameters: HashMap::new(),
918            priority: TaskPriority::Normal,
919            dependencies: vec![],
920        };
921
922        assert!(processor.submit_task(task).is_ok());
923        assert_eq!(processor.task_queue.len(), 1);
924    }
925
926    #[test]
927    fn test_task_priority_ordering() {
928        let config = ClusterConfig::default();
929        let mut processor: DistributedProcessor<f64> = DistributedProcessor::new(config);
930
931        // Submit tasks with different priorities
932        let low_task = DistributedTask {
933            id: "low".to_string(),
934            task_type: TaskType::Forecasting,
935            input_data: Array1::zeros(10),
936            parameters: HashMap::new(),
937            priority: TaskPriority::Low,
938            dependencies: vec![],
939        };
940
941        let high_task = DistributedTask {
942            id: "high".to_string(),
943            task_type: TaskType::Forecasting,
944            input_data: Array1::zeros(10),
945            parameters: HashMap::new(),
946            priority: TaskPriority::High,
947            dependencies: vec![],
948        };
949
950        processor.submit_task(low_task).expect("Operation failed");
951        processor.submit_task(high_task).expect("Operation failed");
952
953        // High priority task should be first
954        assert_eq!(processor.task_queue[0].priority, TaskPriority::High);
955        assert_eq!(processor.task_queue[1].priority, TaskPriority::Low);
956    }
957
958    #[test]
959    fn test_distributed_forecasting() {
960        let config = ClusterConfig::default();
961        let mut processor: DistributedProcessor<f64> = DistributedProcessor::new(config);
962
963        let data = Array1::from_vec((1..100).map(|x| x as f64).collect());
964        let horizon = 10;
965
966        let result = processor.distributed_forecast(&data, horizon, "linear");
967        assert!(result.is_ok());
968
969        let forecast = result.expect("Operation failed");
970        assert_eq!(forecast.len(), horizon);
971    }
972
973    #[test]
974    fn test_cluster_status() {
975        let config = ClusterConfig::default();
976        let processor: DistributedProcessor<f64> = DistributedProcessor::new(config);
977
978        let status = processor.get_cluster_status();
979        assert_eq!(status.total_nodes, 1);
980        assert_eq!(status.available_nodes, 1);
981        assert_eq!(status.total_running_tasks, 0);
982        assert_eq!(status.total_completed_tasks, 0);
983        assert_eq!(status.total_queued_tasks, 0);
984    }
985
986    #[test]
987    fn test_load_balancing_strategies() {
988        // Test that different strategies are properly defined
989        assert_ne!(
990            LoadBalancingStrategy::RoundRobin,
991            LoadBalancingStrategy::LoadBased
992        );
993        assert_ne!(
994            LoadBalancingStrategy::Random,
995            LoadBalancingStrategy::Weighted
996        );
997    }
998
999    #[test]
1000    fn test_task_status_enum() {
1001        assert_eq!(TaskStatus::Pending, TaskStatus::Pending);
1002        assert_ne!(TaskStatus::Running, TaskStatus::Completed);
1003    }
1004
1005    #[test]
1006    fn test_fault_tolerance_config() {
1007        let config = FaultToleranceConfig::default();
1008        assert_eq!(config.max_retries, 3);
1009        assert_eq!(config.replication_factor, 2);
1010        assert!(!config.enable_replication);
1011    }
1012}