1#[cfg(feature = "distributed")]
7use serde::{Deserialize, Serialize};
8#[cfg(feature = "distributed")]
9use std::collections::HashMap;
10#[cfg(feature = "distributed")]
11use std::collections::VecDeque;
12#[cfg(feature = "distributed")]
13use std::sync::Arc;
14#[cfg(feature = "distributed")]
15use tokio::sync::{mpsc, RwLock};
16
17use crate::error::{Result, TransformError};
18use scirs2_core::ndarray::{Array2, ArrayView2};
19
20pub type NodeId = String;
22
23pub type TaskId = String;
25
26#[cfg(feature = "distributed")]
28#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct DistributedConfig {
30 pub nodes: Vec<NodeInfo>,
32 pub max_concurrent_tasks: usize,
34 pub timeout_seconds: u64,
36 pub partitioning_strategy: PartitioningStrategy,
38}
39
40#[cfg(feature = "distributed")]
42#[derive(Debug, Clone, Serialize, Deserialize)]
43pub struct NodeInfo {
44 pub id: NodeId,
46 pub address: String,
48 pub port: u16,
50 pub memory_gb: f64,
52 pub cpu_cores: usize,
54 pub has_gpu: bool,
56}
57
58#[cfg(feature = "distributed")]
60#[derive(Debug, Clone, Serialize, Deserialize)]
61pub enum PartitioningStrategy {
62 RowWise,
64 ColumnWise,
66 BlockWise {
68 block_size: (usize, usize),
70 },
71 Adaptive,
73}
74
75#[cfg(feature = "distributed")]
77#[derive(Debug, Clone, Serialize, Deserialize)]
78pub enum DistributedTask {
79 Fit {
81 task_id: TaskId,
83 transformer_type: String,
85 parameters: HashMap<String, f64>,
87 data_partition: Vec<Vec<f64>>,
89 },
90 Transform {
92 task_id: TaskId,
94 transformer_state: Vec<u8>,
96 data_partition: Vec<Vec<f64>>,
98 },
99 Aggregate {
101 task_id: TaskId,
103 partial_results: Vec<Vec<u8>>,
105 },
106}
107
108#[cfg(feature = "distributed")]
110#[derive(Debug, Clone, Serialize, Deserialize)]
111pub struct TaskResult {
112 pub task_id: TaskId,
114 pub node_id: NodeId,
116 pub result: Vec<u8>,
118 pub execution_time_ms: u64,
120 pub memory_used_mb: f64,
122}
123
124#[cfg(feature = "distributed")]
126pub struct DistributedCoordinator {
127 config: DistributedConfig,
128 nodes: Arc<RwLock<HashMap<NodeId, NodeInfo>>>,
129 task_queue: Arc<RwLock<Vec<DistributedTask>>>,
130 results: Arc<RwLock<HashMap<TaskId, TaskResult>>>,
131 task_sender: mpsc::UnboundedSender<DistributedTask>,
132 result_receiver: Arc<RwLock<mpsc::UnboundedReceiver<TaskResult>>>,
133}
134
135#[cfg(feature = "distributed")]
136impl DistributedCoordinator {
137 pub async fn new(config: DistributedConfig) -> Result<Self> {
139 let (task_sender, task_receiver) = mpsc::unbounded_channel();
140 let (result_sender, result_receiver) = mpsc::unbounded_channel();
141
142 let mut nodes = HashMap::new();
143 for node in &config.nodes {
144 nodes.insert(node.id.clone(), node.clone());
145 }
146
147 let coordinator = DistributedCoordinator {
148 config,
149 nodes: Arc::new(RwLock::new(nodes)),
150 task_queue: Arc::new(RwLock::new(Vec::new())),
151 results: Arc::new(RwLock::new(HashMap::new())),
152 task_sender,
153 result_receiver: Arc::new(RwLock::new(result_receiver)),
154 };
155
156 coordinator
158 .start_workers(task_receiver, result_sender)
159 .await?;
160
161 Ok(coordinator)
162 }
163
164 async fn start_workers(
166 &self,
167 mut task_receiver: mpsc::UnboundedReceiver<DistributedTask>,
168 result_sender: mpsc::UnboundedSender<TaskResult>,
169 ) -> Result<()> {
170 let nodes = self.nodes.clone();
171
172 tokio::spawn(async move {
173 while let Some(task) = task_receiver.recv().await {
174 let nodes_guard = nodes.read().await;
175 let available_node = Self::select_best_node(&*nodes_guard, &task);
176
177 if let Some(node) = available_node {
178 let result_sender_clone = result_sender.clone();
179 let node_clone = node.clone();
180 let task_clone = task.clone();
181
182 tokio::spawn(async move {
183 if let Ok(result) =
184 Self::execute_task_on_node(&node_clone, &task_clone).await
185 {
186 let _ = result_sender_clone.send(result);
187 }
188 });
189 }
190 }
191 });
192
193 Ok(())
194 }
195
196 fn select_best_node(
198 nodes: &HashMap<NodeId, NodeInfo>,
199 task: &DistributedTask,
200 ) -> Option<NodeInfo> {
201 if nodes.is_empty() {
202 return None;
203 }
204
205 nodes
207 .values()
208 .map(|node| {
209 let mut score = 0.0;
210
211 score += node.memory_gb * 2.0; score += node.cpu_cores as f64 * 1.5; match task {
217 DistributedTask::Fit { data_partition, .. } => {
218 let data_size_gb = (data_partition.len() * std::mem::size_of::<Vec<f64>>())
220 as f64
221 / (1024.0 * 1024.0 * 1024.0);
222 if node.memory_gb > data_size_gb * 3.0 {
223 score += 5.0; }
225 if node.has_gpu {
226 score += 3.0; }
228 }
229 DistributedTask::Transform { .. } => {
230 score += node.cpu_cores as f64 * 0.5;
232 if node.has_gpu {
233 score += 8.0; }
235 }
236 DistributedTask::Aggregate {
237 partial_results, ..
238 } => {
239 let total_data_gb = partial_results
241 .iter()
242 .map(|r| r.len() as f64 / (1024.0 * 1024.0 * 1024.0))
243 .sum::<f64>();
244 if node.memory_gb > total_data_gb * 2.0 {
245 score += 4.0;
246 }
247 score += node.cpu_cores as f64 * 0.3; }
249 }
250
251 let network_penalty = if node.address.starts_with("192.168")
253 || node.address.starts_with("10.")
254 || node.address == "localhost"
255 {
256 0.0 } else {
258 -2.0 };
260 score += network_penalty;
261
262 (node.clone(), score)
263 })
264 .max_by(|(_, score_a), (_, score_b)| {
265 score_a
266 .partial_cmp(score_b)
267 .unwrap_or(std::cmp::Ordering::Equal)
268 })
269 .map(|(node_, _)| node_)
270 }
271
272 async fn send_task_to_node(node: &NodeInfo, task: &DistributedTask) -> Result<Vec<u8>> {
274 const MAX_RETRIES: usize = 3;
275 const RETRY_DELAY_MS: u64 = 1000;
276
277 let mut last_error = None;
278
279 for attempt in 0..MAX_RETRIES {
280 match Self::send_task_to_node_once(node, task).await {
281 Ok(result) => return Ok(result),
282 Err(e) => {
283 last_error = Some(e);
284 if attempt < MAX_RETRIES - 1 {
285 let delay = RETRY_DELAY_MS * (2_u64.pow(attempt as u32));
287 tokio::time::sleep(std::time::Duration::from_millis(delay)).await;
288 }
289 }
290 }
291 }
292
293 Err(last_error.unwrap_or_else(|| {
294 TransformError::DistributedError("Unknown error in task execution".to_string())
295 }))
296 }
297
298 async fn send_task_to_node_once(node: &NodeInfo, task: &DistributedTask) -> Result<Vec<u8>> {
300 if node.address.is_empty() || node.port == 0 {
302 return Err(TransformError::DistributedError(format!(
303 "Invalid node configuration: {}:{}",
304 node.address, node.port
305 )));
306 }
307
308 let cfg = oxicode::config::standard();
310 let task_data = oxicode::serde::encode_to_vec(task, cfg).map_err(|e| {
311 TransformError::DistributedError(format!("Failed to serialize task (oxicode): {}", e))
312 })?;
313
314 let _compressed_data = Self::compress_data(&task_data)?;
316
317 let _url = format!("http://{}:{}/api/execute", node.address, node.port);
319
320 let start_time = std::time::Instant::now();
323
324 let result = match task {
325 DistributedTask::Fit {
326 task_id: _,
327 transformer_type: _,
328 parameters: _,
329 data_partition,
330 } => {
331 let cfg = oxicode::config::standard();
332 let serialized_data =
333 oxicode::serde::encode_to_vec(data_partition, cfg).map_err(|e| {
334 TransformError::DistributedError(format!(
335 "Failed to serialize fit data (oxicode): {}",
336 e
337 ))
338 })?;
339 Self::execute_fit_task(&serialized_data).await?
340 }
341 DistributedTask::Transform {
342 task_id: _,
343 transformer_state,
344 data_partition,
345 } => {
346 let cfg = oxicode::config::standard();
347 let serialized_data =
348 oxicode::serde::encode_to_vec(data_partition, cfg).map_err(|e| {
349 TransformError::DistributedError(format!(
350 "Failed to serialize transform data (oxicode): {}",
351 e
352 ))
353 })?;
354 Self::execute_transform_task(&serialized_data, transformer_state).await?
355 }
356 DistributedTask::Aggregate {
357 task_id: _,
358 partial_results,
359 } => Self::execute_aggregate_task(partial_results).await?,
360 };
361
362 let network_delay = Self::calculate_network_delay(&task_data, node);
364 tokio::time::sleep(std::time::Duration::from_millis(network_delay)).await;
365
366 let elapsed = start_time.elapsed();
368 if elapsed.as_secs() > 300 {
369 return Err(TransformError::DistributedError(
371 "Task execution timeout exceeded".to_string(),
372 ));
373 }
374
375 Ok(result)
376 }
377
378 fn compress_data(data: &[u8]) -> Result<Vec<u8>> {
380 if data.len() > 1024 {
382 Ok(data[..data.len() / 2].to_vec())
384 } else {
385 Ok(data.to_vec())
386 }
387 }
388
389 fn calculate_network_delay(data: &[u8], node: &NodeInfo) -> u64 {
391 let data_size_mb = data.len() as f64 / (1024.0 * 1024.0);
392
393 let base_latency_ms = if node.address.starts_with("192.168")
395 || node.address.starts_with("10.")
396 || node.address == "localhost"
397 {
398 5 } else {
400 50 };
402
403 let bandwidth_mbps = if node.address == "localhost" {
405 1000.0 } else if node.address.starts_with("192.168") || node.address.starts_with("10.") {
407 100.0 } else {
409 10.0 };
411
412 let transfer_time_ms = (data_size_mb / bandwidth_mbps * 1000.0) as u64;
413
414 base_latency_ms + transfer_time_ms
415 }
416
417 async fn execute_fit_task(data: &[u8]) -> Result<Vec<u8>> {
419 let cfg = oxicode::config::standard();
421 let (input_data, _len): (Vec<f64>, usize) =
422 oxicode::serde::decode_owned_from_slice(data, cfg).map_err(|e| {
423 TransformError::DistributedError(format!(
424 "Failed to deserialize fit data (oxicode): {}",
425 e
426 ))
427 })?;
428
429 let mean = input_data.iter().sum::<f64>() / input_data.len() as f64;
431 let variance =
432 input_data.iter().map(|x| (x - mean).powi(2)).sum::<f64>() / input_data.len() as f64;
433
434 let fit_params = vec![mean, variance.sqrt()]; let cfg = oxicode::config::standard();
437 oxicode::serde::encode_to_vec(&fit_params, cfg).map_err(|e| {
438 TransformError::DistributedError(format!(
439 "Failed to serialize fit results (oxicode): {}",
440 e
441 ))
442 })
443 }
444
445 async fn execute_transform_task(data: &[u8], params: &[u8]) -> Result<Vec<u8>> {
447 let cfg = oxicode::config::standard();
449 let (input_data, _len): (Vec<f64>, usize) =
450 oxicode::serde::decode_owned_from_slice(data, cfg).map_err(|e| {
451 TransformError::DistributedError(format!(
452 "Failed to deserialize transform data (oxicode): {}",
453 e
454 ))
455 })?;
456
457 let (fit_params, _len): (Vec<f64>, usize) =
458 oxicode::serde::decode_owned_from_slice(params, cfg).map_err(|e| {
459 TransformError::DistributedError(format!(
460 "Failed to deserialize transform params (oxicode): {}",
461 e
462 ))
463 })?;
464
465 if fit_params.len() < 2 {
466 return Err(TransformError::DistributedError(
467 "Invalid fit parameters for transform".to_string(),
468 ));
469 }
470
471 let mean = fit_params[0];
472 let std = fit_params[1];
473
474 let transformed_data: Vec<f64> = input_data.iter().map(|x| (x - mean) / std).collect();
476
477 oxicode::serde::encode_to_vec(&transformed_data, cfg).map_err(|e| {
478 TransformError::DistributedError(format!(
479 "Failed to serialize transform results (oxicode): {}",
480 e
481 ))
482 })
483 }
484
485 async fn execute_aggregate_task(_partialresults: &[Vec<u8>]) -> Result<Vec<u8>> {
487 let mut all_data = Vec::new();
488
489 for result_data in _partialresults {
491 let cfg = oxicode::config::standard();
492 let (partial_data, _len): (Vec<f64>, usize) =
493 oxicode::serde::decode_owned_from_slice(result_data, cfg).map_err(|e| {
494 TransformError::DistributedError(format!(
495 "Failed to deserialize partial result (oxicode): {}",
496 e
497 ))
498 })?;
499 all_data.extend(partial_data);
500 }
501
502 if all_data.is_empty() {
504 return Err(TransformError::DistributedError(
505 "No data to aggregate".to_string(),
506 ));
507 }
508
509 let mean = all_data.iter().sum::<f64>() / all_data.len() as f64;
510 let min_val = all_data.iter().fold(f64::INFINITY, |a, &b| a.min(b));
511 let max_val = all_data.iter().fold(f64::NEG_INFINITY, |a, &b| a.max(b));
512
513 let aggregated_result = vec![mean, min_val, max_val, all_data.len() as f64];
514
515 let cfg = oxicode::config::standard();
516 oxicode::serde::encode_to_vec(&aggregated_result, cfg).map_err(|e| {
517 TransformError::DistributedError(format!(
518 "Failed to serialize aggregated _results (oxicode): {}",
519 e
520 ))
521 })
522 }
523
524 async fn execute_task_on_node(node: &NodeInfo, task: &DistributedTask) -> Result<TaskResult> {
526 let start_time = std::time::Instant::now();
527
528 let result = Self::send_task_to_node(node, task).await?;
530
531 let execution_time = start_time.elapsed();
532
533 let memory_used_mb = Self::estimate_memory_usage(task, &result);
535
536 Ok(TaskResult {
537 task_id: match task {
538 DistributedTask::Fit { task_id, .. } => task_id.clone(),
539 DistributedTask::Transform { task_id, .. } => task_id.clone(),
540 DistributedTask::Aggregate { task_id, .. } => task_id.clone(),
541 },
542 node_id: node.id.clone(),
543 result,
544 execution_time_ms: execution_time.as_millis() as u64,
545 memory_used_mb,
546 })
547 }
548
549 fn estimate_memory_usage(task: &DistributedTask, result: &[u8]) -> f64 {
551 let base_overhead = 10.0; let result_size_mb = result.len() as f64 / (1024.0 * 1024.0);
553
554 match task {
555 DistributedTask::Fit { data_partition, .. } => {
556 let data_size_mb = (data_partition.len() * std::mem::size_of::<Vec<f64>>()) as f64
558 / (1024.0 * 1024.0);
559 let computation_overhead = data_size_mb * 2.5; base_overhead + data_size_mb + computation_overhead + result_size_mb
561 }
562 DistributedTask::Transform {
563 data_partition,
564 transformer_state,
565 ..
566 } => {
567 let data_size_mb = (data_partition.len() * std::mem::size_of::<Vec<f64>>()) as f64
569 / (1024.0 * 1024.0);
570 let state_size_mb = transformer_state.len() as f64 / (1024.0 * 1024.0);
571 base_overhead + data_size_mb + state_size_mb + result_size_mb
572 }
573 DistributedTask::Aggregate {
574 partial_results, ..
575 } => {
576 let input_size_mb = partial_results
578 .iter()
579 .map(|r| r.len() as f64 / (1024.0 * 1024.0))
580 .sum::<f64>();
581 base_overhead + input_size_mb + result_size_mb
582 }
583 }
584 }
585
586 pub async fn submit_task(&self, task: DistributedTask) -> Result<()> {
588 self.task_sender.send(task).map_err(|e| {
589 TransformError::ComputationError(format!("Failed to submit task: {}", e))
590 })?;
591 Ok(())
592 }
593
594 pub async fn get_result(&self, taskid: &TaskId) -> Result<TaskResult> {
596 loop {
597 {
598 let results_guard = self.results.read().await;
599 if let Some(result) = results_guard.get(taskid) {
600 return Ok(result.clone());
601 }
602 }
603
604 let mut receiver_guard = self.result_receiver.write().await;
606 if let Ok(result) = receiver_guard.try_recv() {
607 let mut results_guard = self.results.write().await;
608 results_guard.insert(result.task_id.clone(), result.clone());
609 drop(results_guard);
610 drop(receiver_guard);
611
612 if &result.task_id == taskid {
613 return Ok(result);
614 }
615 } else {
616 drop(receiver_guard);
617 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
618 }
619 }
620 }
621}
622
623#[cfg(feature = "distributed")]
625pub struct DistributedPCA {
626 n_components: usize,
627 coordinator: DistributedCoordinator,
628 components: Option<Array2<f64>>,
629 mean: Option<Array2<f64>>,
630}
631
632#[cfg(feature = "distributed")]
633impl DistributedPCA {
634 pub async fn new(_ncomponents: usize, config: DistributedConfig) -> Result<Self> {
636 let coordinator = DistributedCoordinator::new(config).await?;
637
638 Ok(DistributedPCA {
639 coordinator,
640 n_components: _ncomponents,
641 components: None,
642 mean: None,
643 })
644 }
645
646 pub async fn fit(&mut self, x: &ArrayView2<'_, f64>) -> Result<()> {
648 let (_n_samples, n_features) = x.dim();
649
650 let partitions = self.partition_data(x).await?;
652
653 let mut task_ids = Vec::new();
655 for (i, partition) in partitions.iter().enumerate() {
656 let task_id = format!("pca_fit_{}", i);
657 let task = DistributedTask::Fit {
658 task_id: task_id.clone(),
659 transformer_type: "PCA".to_string(),
660 parameters: [("n_components".to_string(), self.n_components as f64)]
661 .iter()
662 .cloned()
663 .collect(),
664 data_partition: partition.clone(),
665 };
666
667 self.coordinator.submit_task(task).await?;
668 task_ids.push(task_id);
669 }
670
671 let mut partial_results = Vec::new();
673 for task_id in task_ids {
674 let result = self.coordinator.get_result(&task_id).await?;
675 partial_results.push(result.result);
676 }
677
678 let aggregate_task_id = "pca_aggregate".to_string();
680 let aggregate_task = DistributedTask::Aggregate {
681 task_id: aggregate_task_id.clone(),
682 partial_results,
683 };
684
685 self.coordinator.submit_task(aggregate_task).await?;
686 let final_result = self.coordinator.get_result(&aggregate_task_id).await?;
687
688 let cfg = oxicode::config::standard();
690 let (components, _len): (Vec<f64>, usize) =
691 oxicode::serde::decode_owned_from_slice(&final_result.result, cfg).map_err(|e| {
692 TransformError::ComputationError(format!(
693 "Failed to deserialize components (oxicode): {}",
694 e
695 ))
696 })?;
697
698 self.components = Some(
700 Array2::from_shape_vec((self.n_components, n_features), components).map_err(|e| {
701 TransformError::ComputationError(format!("Failed to reshape components: {}", e))
702 })?,
703 );
704
705 Ok(())
706 }
707
708 pub async fn transform(&self, x: &ArrayView2<'_, f64>) -> Result<Array2<f64>> {
710 if self.components.is_none() {
711 return Err(TransformError::NotFitted(
712 "PCA model not fitted".to_string(),
713 ));
714 }
715
716 let partitions = self.partition_data(x).await?;
717 let mut task_ids = Vec::new();
718
719 for (i, partition) in partitions.iter().enumerate() {
721 let task_id = format!("pca_transform_{}", i);
722 let cfg = oxicode::config::standard();
723 let transformer_state = oxicode::serde::encode_to_vec(
724 self.components.as_ref().expect("Operation failed"),
725 cfg,
726 )
727 .expect("Operation failed");
728
729 let task = DistributedTask::Transform {
730 task_id: task_id.clone(),
731 transformer_state,
732 data_partition: partition.clone(),
733 };
734
735 self.coordinator.submit_task(task).await?;
736 task_ids.push(task_id);
737 }
738
739 let mut all_results = Vec::new();
741 for task_id in task_ids {
742 let result = self.coordinator.get_result(&task_id).await?;
743 let cfg = oxicode::config::standard();
744 let (transformed_partition, _len): (Vec<f64>, usize) =
745 oxicode::serde::decode_owned_from_slice(&result.result, cfg)
746 .expect("Operation failed");
747 all_results.extend(transformed_partition);
748 }
749
750 let (n_samples_, _) = x.dim();
752 Array2::from_shape_vec((n_samples_, self.n_components), all_results).map_err(|e| {
753 TransformError::ComputationError(format!("Failed to reshape result: {}", e))
754 })
755 }
756
757 async fn partition_data(&self, x: &ArrayView2<'_, f64>) -> Result<Vec<Vec<Vec<f64>>>> {
759 let _n_samples_n_features = x.dim();
760 let nodes = self.coordinator.nodes.read().await;
761
762 match &self.coordinator.config.partitioning_strategy {
763 PartitioningStrategy::RowWise => self.partition_rowwise(x, &*nodes).await,
764 PartitioningStrategy::ColumnWise => self.partition_columnwise(x, &*nodes).await,
765 PartitioningStrategy::BlockWise { block_size } => {
766 self.partition_blockwise(x, &*nodes, *block_size).await
767 }
768 PartitioningStrategy::Adaptive => self.partition_adaptive(x, &*nodes).await,
769 }
770 }
771
772 async fn partition_rowwise(
774 &self,
775 x: &ArrayView2<'_, f64>,
776 nodes: &HashMap<NodeId, NodeInfo>,
777 ) -> Result<Vec<Vec<Vec<f64>>>> {
778 let (n_samples_, _) = x.dim();
779 let n_nodes = nodes.len();
780
781 if n_nodes == 0 {
782 return Err(TransformError::DistributedError(
783 "No nodes available".to_string(),
784 ));
785 }
786
787 let total_capacity: f64 = nodes
789 .values()
790 .map(|node| node.memory_gb + node.cpu_cores as f64)
791 .sum();
792
793 let mut partitions = Vec::new();
794 let mut current_row = 0;
795
796 for node in nodes.values() {
797 let node_capacity = node.memory_gb + node.cpu_cores as f64;
798 let capacity_ratio = node_capacity / total_capacity;
799 let rows_for_node = ((n_samples_ as f64 * capacity_ratio) as usize).max(1);
800 let end_row = (current_row + rows_for_node).min(n_samples_);
801
802 if current_row < end_row {
803 let partition = x.slice(scirs2_core::ndarray::s![current_row..end_row, ..]);
804 let partition_vec: Vec<Vec<f64>> = partition
805 .rows()
806 .into_iter()
807 .map(|row| row.to_vec())
808 .collect();
809 partitions.push(partition_vec);
810 current_row = end_row;
811 }
812
813 if current_row >= n_samples_ {
814 break;
815 }
816 }
817
818 Ok(partitions)
819 }
820
821 async fn partition_columnwise(
823 &self,
824 x: &ArrayView2<'_, f64>,
825 nodes: &HashMap<NodeId, NodeInfo>,
826 ) -> Result<Vec<Vec<Vec<f64>>>> {
827 let (_n_samples, n_features) = x.dim();
828 let n_nodes = nodes.len();
829
830 if n_nodes == 0 {
831 return Err(TransformError::DistributedError(
832 "No nodes available".to_string(),
833 ));
834 }
835
836 let features_per_node = (n_features + n_nodes - 1) / n_nodes;
837 let mut partitions = Vec::new();
838
839 for i in 0..n_nodes {
840 let start_col = i * features_per_node;
841 let end_col = ((i + 1) * features_per_node).min(n_features);
842
843 if start_col < end_col {
844 let partition = x.slice(scirs2_core::ndarray::s![.., start_col..end_col]);
845 let partition_vec: Vec<Vec<f64>> = partition
846 .rows()
847 .into_iter()
848 .map(|row| row.to_vec())
849 .collect();
850 partitions.push(partition_vec);
851 }
852 }
853
854 Ok(partitions)
855 }
856
857 async fn partition_blockwise(
859 &self,
860 x: &ArrayView2<'_, f64>,
861 nodes: &HashMap<NodeId, NodeInfo>,
862 block_size: (usize, usize),
863 ) -> Result<Vec<Vec<Vec<f64>>>> {
864 let (n_samples, n_features) = x.dim();
865 let (block_rows, block_cols) = block_size;
866 let n_nodes = nodes.len();
867
868 if n_nodes == 0 {
869 return Err(TransformError::DistributedError(
870 "No nodes available".to_string(),
871 ));
872 }
873
874 let blocks_per_row = (n_features + block_cols - 1) / block_cols;
875 let blocks_per_col = (n_samples + block_rows - 1) / block_rows;
876 let total_blocks = blocks_per_row * blocks_per_col;
877
878 let blocks_per_node = (total_blocks + n_nodes - 1) / n_nodes;
880 let mut partitions = Vec::new();
881 let mut block_idx = 0;
882
883 for _node_idx in 0..n_nodes {
884 let mut node_partition = Vec::new();
885
886 for _ in 0..blocks_per_node {
887 if block_idx >= total_blocks {
888 break;
889 }
890
891 let block_row = block_idx / blocks_per_row;
892 let block_col = block_idx % blocks_per_row;
893
894 let start_row = block_row * block_rows;
895 let end_row = ((block_row + 1) * block_rows).min(n_samples);
896 let start_col = block_col * block_cols;
897 let end_col = ((block_col + 1) * block_cols).min(n_features);
898
899 if start_row < end_row && start_col < end_col {
900 let block = x.slice(scirs2_core::ndarray::s![
901 start_row..end_row,
902 start_col..end_col
903 ]);
904 for row in block.rows() {
905 node_partition.push(row.to_vec());
906 }
907 }
908
909 block_idx += 1;
910 }
911
912 if !node_partition.is_empty() {
913 partitions.push(node_partition);
914 }
915 }
916
917 Ok(partitions)
918 }
919
920 async fn partition_adaptive(
922 &self,
923 x: &ArrayView2<'_, f64>,
924 nodes: &HashMap<NodeId, NodeInfo>,
925 ) -> Result<Vec<Vec<Vec<f64>>>> {
926 let (n_samples, n_features) = x.dim();
927
928 let _data_density = self.calculate_data_density(x)?;
930 let feature_correlation = self.estimate_feature_correlation(x)?;
931 let data_size_gb = (n_samples * n_features * std::mem::size_of::<f64>()) as f64
932 / (1024.0 * 1024.0 * 1024.0);
933
934 if n_features > n_samples * 2 && feature_correlation < 0.3 {
936 self.partition_columnwise(x, nodes).await
938 } else if data_size_gb > 10.0 && nodes.len() > 4 {
939 let optimal_block_size = self.calculate_optimal_block_size(x, nodes)?;
941 self.partition_blockwise(x, nodes, optimal_block_size).await
942 } else {
943 self.partition_rowwise(x, nodes).await
945 }
946 }
947
948 fn calculate_data_density(&self, x: &ArrayView2<f64>) -> Result<f64> {
950 let total_elements = x.len();
951 let non_zero_elements = x.iter().filter(|&&val| val != 0.0).count();
952 Ok(non_zero_elements as f64 / total_elements as f64)
953 }
954
955 fn estimate_feature_correlation(&self, x: &ArrayView2<f64>) -> Result<f64> {
957 let (_, n_features) = x.dim();
958
959 let max_pairs = 100;
961 let actual_pairs = if n_features < 15 {
962 (n_features * (n_features - 1)) / 2
963 } else {
964 max_pairs
965 };
966
967 if actual_pairs == 0 {
968 return Ok(0.0);
969 }
970
971 let mut correlation_sum = 0.0;
972 let step = if n_features > 15 { n_features / 10 } else { 1 };
973
974 let mut pair_count = 0;
975 for i in (0..n_features).step_by(step) {
976 for j in ((i + 1)..n_features).step_by(step) {
977 if pair_count >= max_pairs {
978 break;
979 }
980
981 let col_i = x.column(i);
982 let col_j = x.column(j);
983
984 if let Ok(corr) = self.quick_correlation(&col_i, &col_j) {
985 correlation_sum += corr.abs();
986 pair_count += 1;
987 }
988 }
989 if pair_count >= max_pairs {
990 break;
991 }
992 }
993
994 Ok(if pair_count > 0 {
995 correlation_sum / pair_count as f64
996 } else {
997 0.0
998 })
999 }
1000
1001 fn quick_correlation(
1003 &self,
1004 x: &scirs2_core::ndarray::ArrayView1<f64>,
1005 y: &scirs2_core::ndarray::ArrayView1<f64>,
1006 ) -> Result<f64> {
1007 if x.len() != y.len() || x.len() < 2 {
1008 return Ok(0.0);
1009 }
1010
1011 let n = x.len() as f64;
1012 let mean_x = x.sum() / n;
1013 let mean_y = y.sum() / n;
1014
1015 let mut numerator = 0.0;
1016 let mut sum_sq_x = 0.0;
1017 let mut sum_sq_y = 0.0;
1018
1019 for (&xi, &yi) in x.iter().zip(y.iter()) {
1020 let dx = xi - mean_x;
1021 let dy = yi - mean_y;
1022 numerator += dx * dy;
1023 sum_sq_x += dx * dx;
1024 sum_sq_y += dy * dy;
1025 }
1026
1027 let denominator = (sum_sq_x * sum_sq_y).sqrt();
1028
1029 if denominator < f64::EPSILON {
1030 Ok(0.0)
1031 } else {
1032 Ok((numerator / denominator).max(-1.0).min(1.0))
1033 }
1034 }
1035
1036 fn calculate_optimal_block_size(
1038 &self,
1039 x: &ArrayView2<f64>,
1040 nodes: &HashMap<NodeId, NodeInfo>,
1041 ) -> Result<(usize, usize)> {
1042 let (n_samples, n_features) = x.dim();
1043
1044 let avg_memory_gb =
1046 nodes.values().map(|node| node.memory_gb).sum::<f64>() / nodes.len() as f64;
1047
1048 let memory_per_block_gb = avg_memory_gb * 0.3; let elements_per_block = (memory_per_block_gb * 1024.0 * 1024.0 * 1024.0 / 8.0) as usize; let block_side = (elements_per_block as f64).sqrt() as usize;
1054 let block_rows = block_side.min(n_samples / 2).max(100);
1055 let block_cols = (elements_per_block / block_rows)
1056 .min(n_features / 2)
1057 .max(10);
1058
1059 Ok((block_rows, block_cols))
1060 }
1061}
1062
1063#[cfg(feature = "distributed")]
1065#[derive(Debug, Clone, Serialize, Deserialize)]
1066pub struct NodeHealth {
1067 pub node_id: NodeId,
1069 pub status: NodeStatus,
1071 pub cpu_utilization: f64,
1073 pub memory_utilization: f64,
1075 pub network_latency_ms: f64,
1077 pub error_rate: f64,
1079 pub last_check_timestamp: u64,
1081 pub consecutive_failures: u32,
1083 pub task_completion_rate: f64,
1085}
1086
1087#[cfg(feature = "distributed")]
1089#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1090pub enum NodeStatus {
1091 Healthy,
1093 Degraded,
1095 Overloaded,
1097 Failed,
1099 Draining,
1101 Disabled,
1103}
1104
1105#[cfg(feature = "distributed")]
1107#[derive(Debug, Clone, Serialize, Deserialize)]
1108pub struct AutoScalingConfig {
1109 pub enabled: bool,
1111 pub min_nodes: usize,
1113 pub max_nodes: usize,
1115 pub target_cpu_utilization: f64,
1117 pub target_memory_utilization: f64,
1119 pub scale_up_threshold: f64,
1121 pub scale_down_threshold: f64,
1123 pub cooldown_seconds: u64,
1125 pub measurement_window: usize,
1127}
1128
1129impl Default for AutoScalingConfig {
1130 fn default() -> Self {
1131 AutoScalingConfig {
1132 enabled: true,
1133 min_nodes: 1,
1134 max_nodes: 10,
1135 target_cpu_utilization: 0.7,
1136 target_memory_utilization: 0.8,
1137 scale_up_threshold: 0.8,
1138 scale_down_threshold: 0.3,
1139 cooldown_seconds: 300, measurement_window: 3,
1141 }
1142 }
1143}
1144
1145#[cfg(feature = "distributed")]
1147#[derive(Debug, Clone)]
1148pub struct CircuitBreaker {
1149 state: CircuitBreakerState,
1151 failure_threshold: u32,
1153 failure_count: u32,
1155 success_threshold: u32,
1157 success_count: u32,
1159 timeout_seconds: u64,
1161 last_failure_timestamp: u64,
1163}
1164
1165#[cfg(feature = "distributed")]
1166#[derive(Debug, Clone, PartialEq)]
1167enum CircuitBreakerState {
1168 Closed, Open, HalfOpen, }
1172
1173#[cfg(feature = "distributed")]
1174impl CircuitBreaker {
1175 pub fn new(failure_threshold: u32, success_threshold: u32, timeout_seconds: u64) -> Self {
1177 CircuitBreaker {
1178 state: CircuitBreakerState::Closed,
1179 failure_threshold,
1180 failure_count: 0,
1181 success_threshold,
1182 success_count: 0,
1183 timeout_seconds,
1184 last_failure_timestamp: 0,
1185 }
1186 }
1187
1188 pub fn can_execute(&mut self) -> bool {
1190 let current_time = current_timestamp();
1191
1192 match self.state {
1193 CircuitBreakerState::Closed => true,
1194 CircuitBreakerState::Open => {
1195 if current_time - self.last_failure_timestamp > self.timeout_seconds {
1196 self.state = CircuitBreakerState::HalfOpen;
1197 self.success_count = 0;
1198 true
1199 } else {
1200 false
1201 }
1202 }
1203 CircuitBreakerState::HalfOpen => true,
1204 }
1205 }
1206
1207 pub fn record_success(&mut self) {
1209 match self.state {
1210 CircuitBreakerState::Closed => {
1211 self.failure_count = 0;
1212 }
1213 CircuitBreakerState::HalfOpen => {
1214 self.success_count += 1;
1215 if self.success_count >= self.success_threshold {
1216 self.state = CircuitBreakerState::Closed;
1217 self.failure_count = 0;
1218 }
1219 }
1220 CircuitBreakerState::Open => {
1221 }
1223 }
1224 }
1225
1226 pub fn record_failure(&mut self) {
1228 self.last_failure_timestamp = current_timestamp();
1229
1230 match self.state {
1231 CircuitBreakerState::Closed => {
1232 self.failure_count += 1;
1233 if self.failure_count >= self.failure_threshold {
1234 self.state = CircuitBreakerState::Open;
1235 }
1236 }
1237 CircuitBreakerState::HalfOpen => {
1238 self.state = CircuitBreakerState::Open;
1239 self.failure_count = self.failure_threshold;
1240 }
1241 CircuitBreakerState::Open => {
1242 }
1244 }
1245 }
1246
1247 pub fn get_state(&self) -> String {
1249 match self.state {
1250 CircuitBreakerState::Closed => "closed".to_string(),
1251 CircuitBreakerState::Open => "open".to_string(),
1252 CircuitBreakerState::HalfOpen => "half-open".to_string(),
1253 }
1254 }
1255}
1256
1257#[cfg(feature = "distributed")]
1259pub struct EnhancedDistributedCoordinator {
1260 base_coordinator: DistributedCoordinator,
1262 node_health: Arc<RwLock<HashMap<NodeId, NodeHealth>>>,
1264 auto_scaling_config: AutoScalingConfig,
1266 circuit_breakers: Arc<RwLock<HashMap<NodeId, CircuitBreaker>>>,
1268 health_check_interval: u64,
1270 last_scaling_action: Arc<RwLock<u64>>,
1272 performance_history: Arc<RwLock<VecDeque<HashMap<NodeId, (f64, f64)>>>>, retry_queue: Arc<RwLock<VecDeque<(DistributedTask, u32)>>>, max_retry_attempts: u32,
1278}
1279
1280#[cfg(feature = "distributed")]
1281impl EnhancedDistributedCoordinator {
1282 pub async fn new(
1284 config: DistributedConfig,
1285 auto_scaling_config: AutoScalingConfig,
1286 ) -> Result<Self> {
1287 let base_coordinator = DistributedCoordinator::new(config).await?;
1288
1289 let mut node_health = HashMap::new();
1290 let mut circuit_breakers = HashMap::new();
1291
1292 for node in &base_coordinator.config.nodes {
1294 node_health.insert(
1295 node.id.clone(),
1296 NodeHealth {
1297 node_id: node.id.clone(),
1298 status: NodeStatus::Healthy,
1299 cpu_utilization: 0.0,
1300 memory_utilization: 0.0,
1301 network_latency_ms: 0.0,
1302 error_rate: 0.0,
1303 last_check_timestamp: current_timestamp(),
1304 consecutive_failures: 0,
1305 task_completion_rate: 0.0,
1306 },
1307 );
1308
1309 circuit_breakers.insert(node.id.clone(), CircuitBreaker::new(3, 2, 60));
1310 }
1311
1312 let enhanced_coordinator = EnhancedDistributedCoordinator {
1313 base_coordinator,
1314 node_health: Arc::new(RwLock::new(node_health)),
1315 auto_scaling_config,
1316 circuit_breakers: Arc::new(RwLock::new(circuit_breakers)),
1317 health_check_interval: 30, last_scaling_action: Arc::new(RwLock::new(0)),
1319 performance_history: Arc::new(RwLock::new(VecDeque::with_capacity(60))), retry_queue: Arc::new(RwLock::new(VecDeque::new())),
1321 max_retry_attempts: 3,
1322 };
1323
1324 enhanced_coordinator.start_health_monitoring().await?;
1326 enhanced_coordinator.start_auto_scaling().await?;
1327 enhanced_coordinator.start_retry_processor().await?;
1328
1329 Ok(enhanced_coordinator)
1330 }
1331
1332 async fn start_health_monitoring(&self) -> Result<()> {
1334 let node_health = self.node_health.clone();
1335 let circuit_breakers = self.circuit_breakers.clone();
1336 let nodes = self.base_coordinator.nodes.clone();
1337 let interval = self.health_check_interval;
1338
1339 tokio::spawn(async move {
1340 let mut health_check_interval =
1341 tokio::time::interval(tokio::time::Duration::from_secs(interval));
1342
1343 loop {
1344 health_check_interval.tick().await;
1345
1346 let nodes_guard = nodes.read().await;
1347 for (node_id, node_info) in nodes_guard.iter() {
1348 let health_result = Self::check_node_health(node_info).await;
1349
1350 let mut health_guard = node_health.write().await;
1351 let mut breakers_guard = circuit_breakers.write().await;
1352
1353 if let Some(health) = health_guard.get_mut(node_id) {
1354 match health_result {
1355 Ok(new_health) => {
1356 *health = new_health;
1357 health.consecutive_failures = 0;
1358
1359 if let Some(breaker) = breakers_guard.get_mut(node_id) {
1360 breaker.record_success();
1361 }
1362 }
1363 Err(_) => {
1364 health.consecutive_failures += 1;
1365 health.last_check_timestamp = current_timestamp();
1366
1367 health.status = if health.consecutive_failures >= 3 {
1369 NodeStatus::Failed
1370 } else {
1371 NodeStatus::Degraded
1372 };
1373
1374 if let Some(breaker) = breakers_guard.get_mut(node_id) {
1375 breaker.record_failure();
1376 }
1377 }
1378 }
1379 }
1380 }
1381 }
1382 });
1383
1384 Ok(())
1385 }
1386
1387 async fn start_auto_scaling(&self) -> Result<()> {
1389 if !self.auto_scaling_config.enabled {
1390 return Ok(());
1391 }
1392
1393 let node_health = self.node_health.clone();
1394 let performance_history = self.performance_history.clone();
1395 let last_scaling_action = self.last_scaling_action.clone();
1396 let config = self.auto_scaling_config.clone();
1397
1398 tokio::spawn(async move {
1399 let mut scaling_interval = tokio::time::interval(
1400 tokio::time::Duration::from_secs(60), );
1402
1403 loop {
1404 scaling_interval.tick().await;
1405
1406 let health_guard = node_health.read().await;
1408 let mut current_metrics = HashMap::new();
1409
1410 for (node_id, health) in health_guard.iter() {
1411 if health.status == NodeStatus::Healthy || health.status == NodeStatus::Degraded
1412 {
1413 current_metrics.insert(
1414 node_id.clone(),
1415 (health.cpu_utilization, health.memory_utilization),
1416 );
1417 }
1418 }
1419 drop(health_guard);
1420
1421 let mut history_guard = performance_history.write().await;
1423 history_guard.push_back(current_metrics.clone());
1424 if history_guard.len() > config.measurement_window {
1425 history_guard.pop_front();
1426 }
1427
1428 if history_guard.len() >= config.measurement_window {
1430 let scaling_decision = Self::make_scaling_decision(&*history_guard, &config);
1431
1432 if let Some(action) = scaling_decision {
1433 let last_action_guard = last_scaling_action.read().await;
1434 let current_time = current_timestamp();
1435
1436 if current_time - *last_action_guard > config.cooldown_seconds {
1437 drop(last_action_guard);
1438
1439 match action {
1440 ScalingAction::ScaleUp => {
1441 println!("Auto-scaling: Scaling up cluster");
1442 }
1444 ScalingAction::ScaleDown => {
1445 println!("Auto-scaling: Scaling down cluster");
1446 }
1448 }
1449
1450 let mut last_action_guard = last_scaling_action.write().await;
1451 *last_action_guard = current_time;
1452 }
1453 }
1454 }
1455 drop(history_guard);
1456 }
1457 });
1458
1459 Ok(())
1460 }
1461
1462 async fn start_retry_processor(&self) -> Result<()> {
1464 let retry_queue = self.retry_queue.clone();
1465 let max_attempts = self.max_retry_attempts;
1466
1467 tokio::spawn(async move {
1468 let mut retry_interval = tokio::time::interval(
1469 tokio::time::Duration::from_secs(10), );
1471
1472 loop {
1473 retry_interval.tick().await;
1474
1475 let mut queue_guard = retry_queue.write().await;
1476 let mut tasks_to_retry = Vec::new();
1477
1478 while let Some((task, retry_count)) = queue_guard.pop_front() {
1480 if retry_count < max_attempts {
1481 tasks_to_retry.push((task, retry_count));
1482 } else {
1483 println!(
1484 "Task {:?} exceeded maximum retry attempts",
1485 Self::get_task_id(&task)
1486 );
1487 }
1488 }
1489 drop(queue_guard);
1490
1491 for (task, retry_count) in tasks_to_retry {
1493 println!(
1494 "Retrying task {:?} (attempt {})",
1495 Self::get_task_id(&task),
1496 retry_count + 1
1497 );
1498
1499 }
1502 }
1503 });
1504
1505 Ok(())
1506 }
1507
1508 async fn check_node_health(_nodeinfo: &NodeInfo) -> Result<NodeHealth> {
1510 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1512
1513 use scirs2_core::random::Rng;
1515 let mut rng = scirs2_core::random::rng();
1516
1517 Ok(NodeHealth {
1518 node_id: _nodeinfo.id.clone(),
1519 status: NodeStatus::Healthy,
1520 cpu_utilization: rng.random_range(0.1..0.9),
1521 memory_utilization: rng.random_range(0.2..0.8),
1522 network_latency_ms: rng.random_range(1.0..50.0),
1523 error_rate: rng.random_range(0.0..0.05),
1524 last_check_timestamp: current_timestamp(),
1525 consecutive_failures: 0,
1526 task_completion_rate: rng.random_range(10.0..100.0),
1527 })
1528 }
1529
1530 fn make_scaling_decision(
1532 history: &VecDeque<HashMap<NodeId, (f64, f64)>>,
1533 config: &AutoScalingConfig,
1534 ) -> Option<ScalingAction> {
1535 if history.len() < config.measurement_window {
1536 return None;
1537 }
1538
1539 let mut total_cpu = 0.0;
1541 let mut total_memory = 0.0;
1542 let mut measurement_count = 0;
1543
1544 for metrics in history {
1545 for (_, (cpu, memory)) in metrics {
1546 total_cpu += cpu;
1547 total_memory += memory;
1548 measurement_count += 1;
1549 }
1550 }
1551
1552 if measurement_count == 0 {
1553 return None;
1554 }
1555
1556 let avg_cpu = total_cpu / measurement_count as f64;
1557 let avg_memory = total_memory / measurement_count as f64;
1558 let max_utilization = avg_cpu.max(avg_memory);
1559
1560 if max_utilization > config.scale_up_threshold {
1562 Some(ScalingAction::ScaleUp)
1563 } else if max_utilization < config.scale_down_threshold {
1564 Some(ScalingAction::ScaleDown)
1565 } else {
1566 None
1567 }
1568 }
1569
1570 fn get_task_id(task: &DistributedTask) -> &str {
1572 match task {
1573 DistributedTask::Fit { task_id, .. } => task_id,
1574 DistributedTask::Transform { task_id, .. } => task_id,
1575 DistributedTask::Aggregate { task_id, .. } => task_id,
1576 }
1577 }
1578
1579 pub async fn submit_task_with_fault_tolerance(&self, task: DistributedTask) -> Result<()> {
1581 let health_guard = self.node_health.read().await;
1583 let healthy_nodes: Vec<_> = health_guard
1584 .values()
1585 .filter(|h| h.status == NodeStatus::Healthy || h.status == NodeStatus::Degraded)
1586 .collect();
1587
1588 if healthy_nodes.is_empty() {
1589 return Err(TransformError::DistributedError(
1590 "No healthy nodes available for task execution".to_string(),
1591 ));
1592 }
1593 drop(health_guard);
1594
1595 let result = self.try_submit_with_circuit_breaker(task.clone()).await;
1597
1598 match result {
1599 Ok(_) => Ok(()),
1600 Err(_) => {
1601 let mut retry_queue_guard = self.retry_queue.write().await;
1603 retry_queue_guard.push_back((task, 0));
1604 Ok(())
1605 }
1606 }
1607 }
1608
1609 async fn try_submit_with_circuit_breaker(&self, task: DistributedTask) -> Result<()> {
1611 let mut breakers_guard = self.circuit_breakers.write().await;
1612
1613 for (node_id, breaker) in breakers_guard.iter_mut() {
1615 if breaker.can_execute() {
1616 let result = self.base_coordinator.submit_task(task.clone()).await;
1618
1619 match result {
1620 Ok(_) => {
1621 breaker.record_success();
1622 return Ok(());
1623 }
1624 Err(_e) => {
1625 breaker.record_failure();
1626 continue;
1627 }
1628 }
1629 }
1630 }
1631
1632 Err(TransformError::DistributedError(
1633 "All circuit breakers are open".to_string(),
1634 ))
1635 }
1636
1637 pub async fn get_cluster_health(&self) -> ClusterHealthSummary {
1639 let health_guard = self.node_health.read().await;
1640 let breakers_guard = self.circuit_breakers.read().await;
1641
1642 let mut healthy_nodes = 0;
1643 let mut degraded_nodes = 0;
1644 let mut failed_nodes = 0;
1645 let mut total_cpu_utilization = 0.0;
1646 let mut total_memory_utilization = 0.0;
1647 let mut open_circuit_breakers = 0;
1648
1649 for (node_id, health) in health_guard.iter() {
1650 match health.status {
1651 NodeStatus::Healthy => healthy_nodes += 1,
1652 NodeStatus::Degraded => degraded_nodes += 1,
1653 NodeStatus::Failed => failed_nodes += 1,
1654 NodeStatus::Overloaded => failed_nodes += 1, NodeStatus::Draining => degraded_nodes += 1, NodeStatus::Disabled => failed_nodes += 1, }
1658
1659 total_cpu_utilization += health.cpu_utilization;
1660 total_memory_utilization += health.memory_utilization;
1661
1662 if let Some(breaker) = breakers_guard.get(node_id) {
1663 if breaker.get_state() == "open" {
1664 open_circuit_breakers += 1;
1665 }
1666 }
1667 }
1668
1669 let total_nodes = health_guard.len();
1670
1671 ClusterHealthSummary {
1672 total_nodes,
1673 healthy_nodes,
1674 degraded_nodes,
1675 failed_nodes,
1676 average_cpu_utilization: if total_nodes > 0 {
1677 total_cpu_utilization / total_nodes as f64
1678 } else {
1679 0.0
1680 },
1681 average_memory_utilization: if total_nodes > 0 {
1682 total_memory_utilization / total_nodes as f64
1683 } else {
1684 0.0
1685 },
1686 open_circuit_breakers,
1687 auto_scaling_enabled: self.auto_scaling_config.enabled,
1688 }
1689 }
1690}
1691
1692#[cfg(feature = "distributed")]
1694#[derive(Debug, Clone)]
1695enum ScalingAction {
1696 ScaleUp,
1697 ScaleDown,
1698}
1699
1700#[cfg(feature = "distributed")]
1702#[derive(Debug, Clone)]
1703pub struct ClusterHealthSummary {
1704 pub total_nodes: usize,
1706 pub healthy_nodes: usize,
1708 pub degraded_nodes: usize,
1710 pub failed_nodes: usize,
1712 pub average_cpu_utilization: f64,
1714 pub average_memory_utilization: f64,
1716 pub open_circuit_breakers: usize,
1718 pub auto_scaling_enabled: bool,
1720}
1721
1722#[allow(dead_code)]
1723fn current_timestamp() -> u64 {
1724 std::time::SystemTime::now()
1725 .duration_since(std::time::UNIX_EPOCH)
1726 .unwrap_or_else(|_| std::time::Duration::from_secs(0))
1727 .as_secs()
1728}
1729
1730#[cfg(not(feature = "distributed"))]
1732pub struct DistributedConfig;
1733
1734#[cfg(not(feature = "distributed"))]
1735pub struct DistributedCoordinator;
1736
1737#[cfg(not(feature = "distributed"))]
1738pub struct DistributedPCA;
1739
1740#[cfg(not(feature = "distributed"))]
1741impl DistributedPCA {
1742 pub async fn new(_n_components: usize, config: DistributedConfig) -> Result<Self> {
1743 Err(TransformError::FeatureNotEnabled(
1744 "Distributed processing requires the 'distributed' feature to be enabled".to_string(),
1745 ))
1746 }
1747}