1#![allow(dead_code)]
9use crate::distributed_monitoring::DistributedMonitor;
10use crate::{TorshDistributedError, TorshResult};
11use serde::{Deserialize, Serialize};
12use std::collections::{HashMap, VecDeque};
13use std::sync::{Arc, Mutex, RwLock};
14use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
15use tracing::info;
16
17#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
19pub enum MemoryAllocationStrategy {
20 Static { allocation_per_node_mb: u64 },
22 Dynamic {
24 target_utilization: f32,
25 adjustment_factor: f32,
26 },
27 Balanced { rebalance_threshold: f32 },
29 Priority {
31 priority_weights: HashMap<String, f32>,
32 },
33 Elastic {
35 base_allocation_mb: u64,
36 max_overflow_mb: u64,
37 },
38 Adaptive {
40 learning_rate: f32,
41 adaptation_window: usize,
42 },
43}
44
45impl Default for MemoryAllocationStrategy {
46 fn default() -> Self {
47 Self::Dynamic {
48 target_utilization: 0.8,
49 adjustment_factor: 0.1,
50 }
51 }
52}
53
54#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
56pub enum MemoryOptimizationTechnique {
57 GradientAccumulation { accumulation_steps: u32 },
59 ActivationCheckpointing { checkpoint_ratio: f32 },
61 CpuOffloading { offload_threshold: f32 },
63 MemoryMapping { page_size: usize },
65 ActivationCompression { compression_ratio: f32 },
67 SmartGC {
69 gc_threshold: f32,
70 gc_interval: Duration,
71 },
72 CrossNodePooling { pool_size_mb: u64 },
74 HierarchicalMemory { levels: Vec<MemoryLevel> },
76}
77
78#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
80pub struct MemoryLevel {
81 pub name: String,
83 pub capacity_mb: u64,
85 pub latency_us: u64,
87 pub bandwidth_mbps: f32,
89 pub cost_factor: f32,
91}
92
93#[derive(Debug, Clone, Serialize, Deserialize)]
95pub struct NodeMemoryStats {
96 pub node_id: String,
98 pub total_memory_mb: u64,
100 pub allocated_memory_mb: u64,
102 pub peak_memory_mb: u64,
104 pub free_memory_mb: u64,
106 pub utilization_percent: f32,
108 pub pressure_score: f32,
110 pub fragmentation: f32,
112 pub allocation_failures: u32,
114 pub allocation_rate_mbps: f32,
116 pub deallocation_rate_mbps: f32,
118 pub timestamp_ms: u64,
120}
121
122#[derive(Debug, Clone, Serialize, Deserialize)]
124pub struct MemoryOptimizationAction {
125 pub id: String,
127 pub target_node: String,
129 pub technique: MemoryOptimizationTechnique,
131 pub expected_savings_mb: u64,
133 pub priority: u32,
135 pub estimated_duration: Duration,
137 pub status: OptimizationStatus,
139 pub created_at: u64,
141}
142
143#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
145pub enum OptimizationStatus {
146 Pending,
148 Executing { progress: f32 },
150 Completed {
152 actual_savings_mb: u64,
153 duration_ms: u64,
154 },
155 Failed { error: String },
157 Cancelled { reason: String },
159}
160
161impl std::fmt::Display for OptimizationStatus {
162 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
163 match self {
164 OptimizationStatus::Pending => write!(f, "Pending"),
165 OptimizationStatus::Executing { progress } => {
166 write!(f, "Executing ({:.1}%)", progress * 100.0)
167 }
168 OptimizationStatus::Completed {
169 actual_savings_mb,
170 duration_ms,
171 } => write!(
172 f,
173 "Completed (saved {}MB in {}ms)",
174 actual_savings_mb, duration_ms
175 ),
176 OptimizationStatus::Failed { error } => write!(f, "Failed: {}", error),
177 OptimizationStatus::Cancelled { reason } => write!(f, "Cancelled: {}", reason),
178 }
179 }
180}
181
182#[derive(Debug, Clone, Serialize, Deserialize)]
184pub struct MemoryOptimizationConfig {
185 pub allocation_strategy: MemoryAllocationStrategy,
187 pub enabled_techniques: Vec<MemoryOptimizationTechnique>,
189 pub pressure_threshold: f32,
191 pub optimization_interval: Duration,
193 pub max_concurrent_optimizations: usize,
195 pub stats_collection_interval: Duration,
197 pub history_retention_size: usize,
199 pub enable_cross_node_balancing: bool,
201 pub enable_predictive_management: bool,
203 pub prediction_window: Duration,
205}
206
207impl Default for MemoryOptimizationConfig {
208 fn default() -> Self {
209 Self {
210 allocation_strategy: MemoryAllocationStrategy::default(),
211 enabled_techniques: vec![
212 MemoryOptimizationTechnique::GradientAccumulation {
213 accumulation_steps: 4,
214 },
215 MemoryOptimizationTechnique::ActivationCheckpointing {
216 checkpoint_ratio: 0.5,
217 },
218 MemoryOptimizationTechnique::SmartGC {
219 gc_threshold: 0.8,
220 gc_interval: Duration::from_secs(30),
221 },
222 ],
223 pressure_threshold: 0.85,
224 optimization_interval: Duration::from_secs(10),
225 max_concurrent_optimizations: 2,
226 stats_collection_interval: Duration::from_secs(5),
227 history_retention_size: 1000,
228 enable_cross_node_balancing: true,
229 enable_predictive_management: true,
230 prediction_window: Duration::from_secs(60),
231 }
232 }
233}
234
235pub struct DistributedMemoryOptimizer {
237 config: MemoryOptimizationConfig,
239 monitor: Arc<DistributedMonitor>,
241 node_memory_stats: Arc<RwLock<HashMap<String, NodeMemoryStats>>>,
243 memory_history: Arc<Mutex<VecDeque<HashMap<String, NodeMemoryStats>>>>,
245 active_optimizations: Arc<RwLock<HashMap<String, MemoryOptimizationAction>>>,
247 optimization_history: Arc<Mutex<VecDeque<MemoryOptimizationAction>>>,
249 allocation_tracker: Arc<Mutex<AllocationTracker>>,
251 memory_predictor: Arc<Mutex<MemoryPredictor>>,
253 memory_balancer: Arc<Mutex<MemoryBalancer>>,
255 last_optimization: Arc<Mutex<Instant>>,
257}
258
259#[derive(Debug)]
261struct AllocationTracker {
262 allocation_requests: HashMap<String, VecDeque<AllocationRequest>>,
264 total_allocated: HashMap<String, u64>,
266 allocation_patterns: HashMap<String, AllocationPattern>,
268}
269
270#[derive(Debug, Clone)]
272struct AllocationRequest {
273 size_mb: u64,
275 timestamp: Instant,
277 allocation_type: String,
279 success: bool,
281}
282
283#[derive(Debug, Clone)]
285struct AllocationPattern {
286 avg_allocation_mb: f64,
288 peak_rate_mbps: f32,
290 allocation_frequency: f32,
292 hourly_patterns: [f32; 24],
294 last_update: Instant,
296}
297
298impl AllocationTracker {
299 fn new() -> Self {
300 Self {
301 allocation_requests: HashMap::new(),
302 total_allocated: HashMap::new(),
303 allocation_patterns: HashMap::new(),
304 }
305 }
306
307 fn track_allocation(
308 &mut self,
309 node_id: &str,
310 size_mb: u64,
311 allocation_type: String,
312 success: bool,
313 ) {
314 let request = AllocationRequest {
315 size_mb,
316 timestamp: Instant::now(),
317 allocation_type,
318 success,
319 };
320
321 let requests = self
323 .allocation_requests
324 .entry(node_id.to_string())
325 .or_default();
326 requests.push_back(request);
327 if requests.len() > 1000 {
328 requests.pop_front();
329 }
330
331 if success {
333 *self.total_allocated.entry(node_id.to_string()).or_insert(0) += size_mb;
334 }
335
336 self.update_allocation_pattern(node_id);
338 }
339
340 fn update_allocation_pattern(&mut self, node_id: &str) {
341 let requests = match self.allocation_requests.get(node_id) {
342 Some(requests) => requests,
343 None => return,
344 };
345
346 if requests.len() < 10 {
347 return; }
349
350 let pattern = self
351 .allocation_patterns
352 .entry(node_id.to_string())
353 .or_insert_with(|| AllocationPattern {
354 avg_allocation_mb: 0.0,
355 peak_rate_mbps: 0.0,
356 allocation_frequency: 0.0,
357 hourly_patterns: [0.0; 24],
358 last_update: Instant::now(),
359 });
360
361 let total_size: u64 = requests.iter().map(|r| r.size_mb).sum();
363 pattern.avg_allocation_mb = total_size as f64 / requests.len() as f64;
364
365 if let (Some(first), Some(last)) = (requests.front(), requests.back()) {
367 let duration_minutes =
368 last.timestamp.duration_since(first.timestamp).as_secs_f32() / 60.0;
369 if duration_minutes > 0.0 {
370 pattern.allocation_frequency = requests.len() as f32 / duration_minutes;
371 }
372 }
373
374 pattern.last_update = Instant::now();
375 }
376
377 fn get_allocation_prediction(&self, node_id: &str, lookahead_minutes: u32) -> u64 {
378 if let Some(pattern) = self.allocation_patterns.get(node_id) {
379 let predicted_requests = pattern.allocation_frequency * lookahead_minutes as f32;
380 (predicted_requests * pattern.avg_allocation_mb as f32) as u64
381 } else {
382 0
383 }
384 }
385}
386
387#[derive(Debug)]
389struct MemoryPredictor {
390 usage_patterns: HashMap<String, VecDeque<f32>>,
392 trend_analysis: HashMap<String, TrendData>,
394 prediction_models: HashMap<String, LinearPredictor>,
396}
397
398#[derive(Debug, Clone)]
400struct TrendData {
401 slope: f32,
403 confidence: f32,
405 seasonal_patterns: Vec<f32>,
407 last_update: Instant,
409}
410
411#[derive(Debug)]
413struct LinearPredictor {
414 data_points: VecDeque<(f32, f32)>, slope: f32,
418 intercept: f32,
420 accuracy: f32,
422 last_training: Instant,
424}
425
426impl LinearPredictor {
427 fn new() -> Self {
428 Self {
429 data_points: VecDeque::with_capacity(100),
430 slope: 0.0,
431 intercept: 0.0,
432 accuracy: 0.0,
433 last_training: Instant::now(),
434 }
435 }
436
437 fn add_data_point(&mut self, time: f32, value: f32) {
438 self.data_points.push_back((time, value));
439 if self.data_points.len() > 100 {
440 self.data_points.pop_front();
441 }
442
443 if self.data_points.len() >= 20 && self.last_training.elapsed().as_secs() >= 60 {
445 self.train();
446 }
447 }
448
449 fn train(&mut self) {
450 if self.data_points.len() < 2 {
451 return;
452 }
453
454 let n = self.data_points.len() as f32;
456 let sum_x: f32 = self.data_points.iter().map(|(x, _)| x).sum();
457 let sum_y: f32 = self.data_points.iter().map(|(_, y)| y).sum();
458 let sum_xy: f32 = self.data_points.iter().map(|(x, y)| x * y).sum();
459 let sum_x2: f32 = self.data_points.iter().map(|(x, _)| x * x).sum();
460
461 let denominator = n * sum_x2 - sum_x * sum_x;
462 if denominator.abs() > 0.001 {
463 self.slope = (n * sum_xy - sum_x * sum_y) / denominator;
464 self.intercept = (sum_y - self.slope * sum_x) / n;
465
466 let mean_y = sum_y / n;
468 let ss_tot: f32 = self
469 .data_points
470 .iter()
471 .map(|(_, y)| (y - mean_y).powi(2))
472 .sum();
473 let ss_res: f32 = self
474 .data_points
475 .iter()
476 .map(|(x, y)| (y - (self.slope * x + self.intercept)).powi(2))
477 .sum();
478
479 self.accuracy = if ss_tot > 0.001 {
480 1.0 - (ss_res / ss_tot)
481 } else {
482 0.0
483 };
484 self.accuracy = self.accuracy.clamp(0.0, 1.0);
485 }
486
487 self.last_training = Instant::now();
488 }
489
490 fn predict(&self, future_time: f32) -> f32 {
491 if self.accuracy < 0.5 {
492 if !self.data_points.is_empty() {
494 self.data_points.iter().map(|(_, y)| y).sum::<f32>() / self.data_points.len() as f32
495 } else {
496 0.0
497 }
498 } else {
499 self.slope * future_time + self.intercept
500 }
501 }
502}
503
504impl MemoryPredictor {
505 fn new() -> Self {
506 Self {
507 usage_patterns: HashMap::new(),
508 trend_analysis: HashMap::new(),
509 prediction_models: HashMap::new(),
510 }
511 }
512
513 fn update_memory_usage(&mut self, node_id: &str, usage_percent: f32) {
514 let pattern = self.usage_patterns.entry(node_id.to_string()).or_default();
516 pattern.push_back(usage_percent);
517 if pattern.len() > 200 {
518 pattern.pop_front();
519 }
520
521 let current_time = SystemTime::now()
523 .duration_since(UNIX_EPOCH)
524 .expect("system time should be after UNIX_EPOCH")
525 .as_secs_f32();
526
527 let model = self
528 .prediction_models
529 .entry(node_id.to_string())
530 .or_insert_with(LinearPredictor::new);
531 model.add_data_point(current_time, usage_percent);
532
533 self.update_trend_analysis(node_id, usage_percent);
535 }
536
537 fn update_trend_analysis(&mut self, node_id: &str, _current_usage: f32) {
538 let pattern = match self.usage_patterns.get(node_id) {
539 Some(pattern) => pattern,
540 None => return,
541 };
542
543 if pattern.len() < 10 {
544 return;
545 }
546
547 let trend = self
548 .trend_analysis
549 .entry(node_id.to_string())
550 .or_insert_with(|| TrendData {
551 slope: 0.0,
552 confidence: 0.0,
553 seasonal_patterns: Vec::new(),
554 last_update: Instant::now(),
555 });
556
557 let recent_points: Vec<f32> = pattern.iter().rev().take(20).cloned().collect();
559 if recent_points.len() >= 10 {
560 let n = recent_points.len() as f32;
561 let x_values: Vec<f32> = (0..recent_points.len()).map(|i| i as f32).collect();
562
563 let sum_x: f32 = x_values.iter().sum();
564 let sum_y: f32 = recent_points.iter().sum();
565 let sum_xy: f32 = x_values
566 .iter()
567 .zip(recent_points.iter())
568 .map(|(x, y)| x * y)
569 .sum();
570 let sum_x2: f32 = x_values.iter().map(|x| x * x).sum();
571
572 let denominator = n * sum_x2 - sum_x * sum_x;
573 if denominator.abs() > 0.001 {
574 trend.slope = (n * sum_xy - sum_x * sum_y) / denominator;
575
576 let mean_y = sum_y / n;
578 let ss_tot: f32 = recent_points.iter().map(|y| (y - mean_y).powi(2)).sum();
579 let predicted: Vec<f32> = x_values
580 .iter()
581 .map(|&x| trend.slope * x + (sum_y - trend.slope * sum_x) / n)
582 .collect();
583 let ss_res: f32 = recent_points
584 .iter()
585 .zip(predicted.iter())
586 .map(|(actual, pred)| (actual - pred).powi(2))
587 .sum();
588
589 trend.confidence = if ss_tot > 0.001 {
590 1.0 - (ss_res / ss_tot)
591 } else {
592 0.0
593 };
594 trend.confidence = trend.confidence.clamp(0.0, 1.0);
595 }
596 }
597
598 trend.last_update = Instant::now();
599 }
600
601 fn predict_memory_usage(&self, node_id: &str, minutes_ahead: u32) -> Option<f32> {
602 let model = self.prediction_models.get(node_id)?;
603 let current_time = SystemTime::now()
604 .duration_since(UNIX_EPOCH)
605 .expect("system time should be after UNIX_EPOCH")
606 .as_secs_f32();
607 let future_time = current_time + (minutes_ahead as f32 * 60.0);
608
609 Some(model.predict(future_time).clamp(0.0, 100.0))
610 }
611
612 fn get_trend_analysis(&self, node_id: &str) -> Option<&TrendData> {
613 self.trend_analysis.get(node_id)
614 }
615}
616
617#[derive(Debug)]
619struct MemoryBalancer {
620 imbalance_threshold: f32,
622 last_balancing: Instant,
624 balancing_history: VecDeque<BalancingOperation>,
626}
627
628#[derive(Debug, Clone)]
630struct BalancingOperation {
631 source_node: String,
633 target_node: String,
635 transfer_amount_mb: u64,
637 timestamp: Instant,
639 success: bool,
641}
642
643impl MemoryBalancer {
644 fn new(imbalance_threshold: f32) -> Self {
645 Self {
646 imbalance_threshold,
647 last_balancing: Instant::now(),
648 balancing_history: VecDeque::with_capacity(100),
649 }
650 }
651
652 fn check_and_balance(
653 &mut self,
654 node_stats: &HashMap<String, NodeMemoryStats>,
655 ) -> Vec<MemoryOptimizationAction> {
656 let mut actions = Vec::new();
657
658 if self.last_balancing.elapsed().as_secs() < 30 {
660 return actions;
661 }
662
663 let mut utilizations: Vec<(String, f32)> = node_stats
664 .iter()
665 .map(|(node_id, stats)| (node_id.clone(), stats.utilization_percent))
666 .collect();
667
668 if utilizations.len() < 2 {
669 return actions;
670 }
671
672 utilizations.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
673
674 let min_util = utilizations
675 .first()
676 .expect("utilizations should have at least 2 elements")
677 .1;
678 let max_util = utilizations
679 .last()
680 .expect("utilizations should have at least 2 elements")
681 .1;
682
683 if (max_util - min_util) > self.imbalance_threshold {
685 let source_node = utilizations
686 .last()
687 .expect("utilizations should have at least 2 elements")
688 .0
689 .clone();
690 let target_node = utilizations
691 .first()
692 .expect("utilizations should have at least 2 elements")
693 .0
694 .clone();
695
696 let target_util = (max_util + min_util) / 2.0;
698 let source_stats = &node_stats[&source_node];
699 let transfer_mb = ((source_stats.utilization_percent - target_util) / 100.0
700 * source_stats.total_memory_mb as f32) as u64;
701
702 if transfer_mb > 100 {
703 let action = MemoryOptimizationAction {
705 id: format!(
706 "balance_{}_{}",
707 SystemTime::now()
708 .duration_since(UNIX_EPOCH)
709 .expect("time should be after UNIX_EPOCH")
710 .as_millis(),
711 SystemTime::now()
712 .duration_since(UNIX_EPOCH)
713 .expect("time should be after UNIX_EPOCH")
714 .as_nanos()
715 % 1000
716 ),
717 target_node: source_node.clone(),
718 technique: MemoryOptimizationTechnique::CrossNodePooling {
719 pool_size_mb: transfer_mb,
720 },
721 expected_savings_mb: transfer_mb,
722 priority: 3,
723 estimated_duration: Duration::from_secs(10),
724 status: OptimizationStatus::Pending,
725 created_at: SystemTime::now()
726 .duration_since(UNIX_EPOCH)
727 .expect("system time should be after UNIX_EPOCH")
728 .as_millis() as u64,
729 };
730
731 actions.push(action);
732
733 let operation = BalancingOperation {
735 source_node,
736 target_node,
737 transfer_amount_mb: transfer_mb,
738 timestamp: Instant::now(),
739 success: true, };
741
742 self.balancing_history.push_back(operation);
743 if self.balancing_history.len() > 100 {
744 self.balancing_history.pop_front();
745 }
746
747 self.last_balancing = Instant::now();
748 }
749 }
750
751 actions
752 }
753}
754
755impl DistributedMemoryOptimizer {
756 pub fn new(config: MemoryOptimizationConfig, monitor: Arc<DistributedMonitor>) -> Self {
758 Self {
759 config: config.clone(),
760 monitor,
761 node_memory_stats: Arc::new(RwLock::new(HashMap::new())),
762 memory_history: Arc::new(Mutex::new(VecDeque::with_capacity(
763 config.history_retention_size,
764 ))),
765 active_optimizations: Arc::new(RwLock::new(HashMap::new())),
766 optimization_history: Arc::new(Mutex::new(VecDeque::with_capacity(
767 config.history_retention_size,
768 ))),
769 allocation_tracker: Arc::new(Mutex::new(AllocationTracker::new())),
770 memory_predictor: Arc::new(Mutex::new(MemoryPredictor::new())),
771 memory_balancer: Arc::new(Mutex::new(MemoryBalancer::new(20.0))), last_optimization: Arc::new(Mutex::new(Instant::now())),
773 }
774 }
775
776 pub fn collect_memory_statistics(&self) -> TorshResult<()> {
778 if let Some(current_metrics) = self.monitor.get_current_metrics()? {
780 let memory_stats = self.extract_memory_stats(¤t_metrics)?;
781
782 {
784 let mut node_stats = self.node_memory_stats.write().map_err(|e| {
785 TorshDistributedError::communication_error(
786 "memory_stats",
787 format!("Lock error: {}", e),
788 )
789 })?;
790 node_stats.insert(memory_stats.node_id.clone(), memory_stats.clone());
791 }
792
793 {
795 let mut history = self.memory_history.lock().map_err(|e| {
796 TorshDistributedError::communication_error(
797 "memory_history",
798 format!("Lock error: {}", e),
799 )
800 })?;
801
802 let current_snapshot = {
803 let node_stats = self.node_memory_stats.read().map_err(|e| {
804 TorshDistributedError::communication_error(
805 "memory_stats",
806 format!("Lock error: {}", e),
807 )
808 })?;
809 node_stats.clone()
810 };
811
812 history.push_back(current_snapshot);
813 if history.len() > self.config.history_retention_size {
814 history.pop_front();
815 }
816 }
817
818 if self.config.enable_predictive_management {
820 let mut predictor = self.memory_predictor.lock().map_err(|e| {
821 TorshDistributedError::communication_error(
822 "memory_predictor",
823 format!("Lock error: {}", e),
824 )
825 })?;
826 predictor
827 .update_memory_usage(&memory_stats.node_id, memory_stats.utilization_percent);
828 }
829 }
830
831 Ok(())
832 }
833
834 fn extract_memory_stats(
836 &self,
837 metrics: &crate::distributed_monitoring::NodeMetrics,
838 ) -> TorshResult<NodeMemoryStats> {
839 let system_metrics = &metrics.system_metrics;
840
841 let total_memory_mb: u64 = 32000; let allocated_memory_mb = system_metrics.memory_usage_mb;
844 let free_memory_mb = total_memory_mb.saturating_sub(allocated_memory_mb);
845 let utilization_percent = (allocated_memory_mb as f32 / total_memory_mb as f32) * 100.0;
846
847 let pressure_score = if utilization_percent > 90.0 {
849 1.0
850 } else if utilization_percent > 80.0 {
851 (utilization_percent - 80.0) / 10.0
852 } else {
853 0.0
854 };
855
856 let fragmentation = if utilization_percent > 70.0 {
858 (utilization_percent - 70.0) / 30.0 * 0.5
859 } else {
860 0.1
861 };
862
863 Ok(NodeMemoryStats {
864 node_id: metrics.node_id.clone(),
865 total_memory_mb,
866 allocated_memory_mb,
867 peak_memory_mb: allocated_memory_mb.max(allocated_memory_mb), free_memory_mb,
869 utilization_percent,
870 pressure_score,
871 fragmentation,
872 allocation_failures: if pressure_score > 0.9 { 1 } else { 0 },
873 allocation_rate_mbps: metrics.training_metrics.throughput_samples_per_sec * 0.1, deallocation_rate_mbps: metrics.training_metrics.throughput_samples_per_sec * 0.08, timestamp_ms: SystemTime::now()
876 .duration_since(UNIX_EPOCH)
877 .expect("system time should be after UNIX_EPOCH")
878 .as_millis() as u64,
879 })
880 }
881
882 pub fn analyze_optimization_opportunities(&self) -> TorshResult<Vec<MemoryOptimizationAction>> {
884 let mut actions = Vec::new();
885
886 {
888 let last_opt = self.last_optimization.lock().map_err(|e| {
889 TorshDistributedError::communication_error(
890 "last_optimization",
891 format!("Lock error: {}", e),
892 )
893 })?;
894 if last_opt.elapsed() < self.config.optimization_interval {
895 return Ok(actions);
896 }
897 }
898
899 let node_stats = self.node_memory_stats.read().map_err(|e| {
900 TorshDistributedError::communication_error("node_stats", format!("Lock error: {}", e))
901 })?;
902
903 for (node_id, stats) in node_stats.iter() {
905 if stats.pressure_score >= self.config.pressure_threshold {
906 actions.extend(self.generate_optimization_actions(node_id, stats)?);
907 }
908 }
909
910 if self.config.enable_cross_node_balancing {
912 let mut balancer = self.memory_balancer.lock().map_err(|e| {
913 TorshDistributedError::communication_error(
914 "memory_balancer",
915 format!("Lock error: {}", e),
916 )
917 })?;
918 actions.extend(balancer.check_and_balance(&node_stats));
919 }
920
921 if self.config.enable_predictive_management {
923 actions.extend(self.generate_predictive_optimizations(&node_stats)?);
924 }
925
926 actions.sort_by(|a, b| b.priority.cmp(&a.priority));
928
929 Ok(actions)
930 }
931
932 fn generate_optimization_actions(
934 &self,
935 node_id: &str,
936 stats: &NodeMemoryStats,
937 ) -> TorshResult<Vec<MemoryOptimizationAction>> {
938 let mut actions = Vec::new();
939
940 for technique in &self.config.enabled_techniques {
941 let (expected_savings, priority) = self.estimate_technique_benefits(technique, stats);
942
943 if expected_savings > 100 {
944 let action = MemoryOptimizationAction {
946 id: format!(
947 "opt_{}_{}_{}",
948 node_id,
949 SystemTime::now()
950 .duration_since(UNIX_EPOCH)
951 .expect("time should be after UNIX_EPOCH")
952 .as_millis(),
953 SystemTime::now()
954 .duration_since(UNIX_EPOCH)
955 .expect("time should be after UNIX_EPOCH")
956 .as_nanos()
957 % 1000
958 ),
959 target_node: node_id.to_string(),
960 technique: technique.clone(),
961 expected_savings_mb: expected_savings,
962 priority,
963 estimated_duration: self.estimate_execution_duration(technique),
964 status: OptimizationStatus::Pending,
965 created_at: SystemTime::now()
966 .duration_since(UNIX_EPOCH)
967 .expect("system time should be after UNIX_EPOCH")
968 .as_millis() as u64,
969 };
970
971 actions.push(action);
972 }
973 }
974
975 Ok(actions)
976 }
977
978 fn estimate_technique_benefits(
980 &self,
981 technique: &MemoryOptimizationTechnique,
982 stats: &NodeMemoryStats,
983 ) -> (u64, u32) {
984 match technique {
985 MemoryOptimizationTechnique::GradientAccumulation { accumulation_steps } => {
986 let savings = stats.allocated_memory_mb / (*accumulation_steps as u64).max(1);
987 (savings, 2)
988 }
989 MemoryOptimizationTechnique::ActivationCheckpointing { checkpoint_ratio } => {
990 let savings = (stats.allocated_memory_mb as f32 * checkpoint_ratio * 0.3) as u64;
991 (savings, 3)
992 }
993 MemoryOptimizationTechnique::CpuOffloading { .. } => {
994 let savings = stats.allocated_memory_mb / 4; (savings, 1)
996 }
997 MemoryOptimizationTechnique::ActivationCompression { compression_ratio } => {
998 let savings = (stats.allocated_memory_mb as f32 * compression_ratio * 0.2) as u64;
999 (savings, 2)
1000 }
1001 MemoryOptimizationTechnique::SmartGC { .. } => {
1002 let fragmentation_savings =
1003 (stats.fragmentation * stats.allocated_memory_mb as f32) as u64;
1004 (fragmentation_savings, 1)
1005 }
1006 MemoryOptimizationTechnique::CrossNodePooling { pool_size_mb } => (*pool_size_mb, 3),
1007 _ => (100, 1), }
1009 }
1010
1011 fn estimate_execution_duration(&self, technique: &MemoryOptimizationTechnique) -> Duration {
1013 match technique {
1014 MemoryOptimizationTechnique::GradientAccumulation { .. } => Duration::from_secs(1),
1015 MemoryOptimizationTechnique::ActivationCheckpointing { .. } => Duration::from_secs(5),
1016 MemoryOptimizationTechnique::CpuOffloading { .. } => Duration::from_secs(10),
1017 MemoryOptimizationTechnique::SmartGC { .. } => Duration::from_secs(2),
1018 MemoryOptimizationTechnique::CrossNodePooling { .. } => Duration::from_secs(15),
1019 _ => Duration::from_secs(5),
1020 }
1021 }
1022
1023 fn generate_predictive_optimizations(
1025 &self,
1026 node_stats: &HashMap<String, NodeMemoryStats>,
1027 ) -> TorshResult<Vec<MemoryOptimizationAction>> {
1028 let mut actions = Vec::new();
1029
1030 let predictor = self.memory_predictor.lock().map_err(|e| {
1031 TorshDistributedError::communication_error("predictor", format!("Lock error: {}", e))
1032 })?;
1033
1034 for (node_id, stats) in node_stats {
1035 if let Some(predicted_usage) = predictor.predict_memory_usage(node_id, 5) {
1037 if predicted_usage > 90.0 && stats.utilization_percent < 80.0 {
1038 let action = MemoryOptimizationAction {
1040 id: format!(
1041 "predictive_{}_{}",
1042 node_id,
1043 SystemTime::now()
1044 .duration_since(UNIX_EPOCH)
1045 .expect("system time should be after UNIX_EPOCH")
1046 .as_millis()
1047 ),
1048 target_node: node_id.clone(),
1049 technique: MemoryOptimizationTechnique::SmartGC {
1050 gc_threshold: 0.7,
1051 gc_interval: Duration::from_secs(15),
1052 },
1053 expected_savings_mb: (predicted_usage - stats.utilization_percent) as u64
1054 * 10,
1055 priority: 4, estimated_duration: Duration::from_secs(3),
1057 status: OptimizationStatus::Pending,
1058 created_at: SystemTime::now()
1059 .duration_since(UNIX_EPOCH)
1060 .expect("time should be after UNIX_EPOCH")
1061 .as_millis() as u64,
1062 };
1063
1064 actions.push(action);
1065 }
1066 }
1067 }
1068
1069 Ok(actions)
1070 }
1071
1072 pub fn execute_optimization(&self, action_id: &str) -> TorshResult<()> {
1074 let action = {
1076 let active_optimizations = self.active_optimizations.read().map_err(|e| {
1077 TorshDistributedError::communication_error(
1078 "active_optimizations",
1079 format!("Lock error: {}", e),
1080 )
1081 })?;
1082 active_optimizations
1083 .get(action_id)
1084 .cloned()
1085 .ok_or_else(|| {
1086 TorshDistributedError::communication_error(
1087 "execute_optimization",
1088 format!("Action {} not found", action_id),
1089 )
1090 })?
1091 };
1092
1093 info!(
1094 "Executing memory optimization: {:?} on node {}",
1095 action.technique, action.target_node
1096 );
1097
1098 {
1100 let mut active_optimizations = self.active_optimizations.write().map_err(|e| {
1101 TorshDistributedError::communication_error(
1102 "active_optimizations",
1103 format!("Lock error: {}", e),
1104 )
1105 })?;
1106 if let Some(action) = active_optimizations.get_mut(action_id) {
1107 action.status = OptimizationStatus::Executing { progress: 0.0 };
1108 }
1109 }
1110
1111 self.simulate_optimization_execution(action_id, &action)?;
1113
1114 Ok(())
1115 }
1116
1117 fn simulate_optimization_execution(
1119 &self,
1120 action_id: &str,
1121 action: &MemoryOptimizationAction,
1122 ) -> TorshResult<()> {
1123 let start_time = Instant::now();
1124
1125 for progress in [0.25, 0.5, 0.75, 1.0] {
1127 {
1128 let mut active_optimizations = self.active_optimizations.write().map_err(|e| {
1129 TorshDistributedError::communication_error(
1130 "active_optimizations",
1131 format!("Lock error: {}", e),
1132 )
1133 })?;
1134 if let Some(action) = active_optimizations.get_mut(action_id) {
1135 action.status = OptimizationStatus::Executing { progress };
1136 }
1137 }
1138
1139 std::thread::sleep(Duration::from_millis(50));
1141 }
1142
1143 let success = (SystemTime::now()
1145 .duration_since(UNIX_EPOCH)
1146 .expect("system time should be after UNIX_EPOCH")
1147 .as_nanos()
1148 % 20)
1149 != 0;
1150 let duration_ms = start_time.elapsed().as_millis() as u64;
1151
1152 let final_status = if success {
1153 let variation = 0.9
1155 + (SystemTime::now()
1156 .duration_since(UNIX_EPOCH)
1157 .expect("system time should be after UNIX_EPOCH")
1158 .as_nanos()
1159 % 21) as f32
1160 / 100.0;
1161 let actual_savings = (action.expected_savings_mb as f32 * variation) as u64;
1162
1163 OptimizationStatus::Completed {
1164 actual_savings_mb: actual_savings,
1165 duration_ms,
1166 }
1167 } else {
1168 OptimizationStatus::Failed {
1169 error: "Simulated optimization failure".to_string(),
1170 }
1171 };
1172
1173 {
1175 let mut active_optimizations = self.active_optimizations.write().map_err(|e| {
1176 TorshDistributedError::communication_error(
1177 "active_optimizations",
1178 format!("Lock error: {}", e),
1179 )
1180 })?;
1181
1182 if let Some(mut action) = active_optimizations.remove(action_id) {
1183 action.status = final_status.clone();
1184
1185 let mut history = self.optimization_history.lock().map_err(|e| {
1187 TorshDistributedError::communication_error(
1188 "optimization_history",
1189 format!("Lock error: {}", e),
1190 )
1191 })?;
1192 history.push_back(action);
1193 if history.len() > self.config.history_retention_size {
1194 history.pop_front();
1195 }
1196 }
1197 }
1198
1199 {
1201 let mut last_opt = self.last_optimization.lock().map_err(|e| {
1202 TorshDistributedError::communication_error(
1203 "last_optimization",
1204 format!("Lock error: {}", e),
1205 )
1206 })?;
1207 *last_opt = Instant::now();
1208 }
1209
1210 info!(
1211 "Memory optimization {} completed with status: {:?}",
1212 action_id, final_status
1213 );
1214 Ok(())
1215 }
1216
1217 pub fn schedule_optimizations(
1219 &self,
1220 actions: Vec<MemoryOptimizationAction>,
1221 ) -> TorshResult<usize> {
1222 let mut scheduled_count = 0;
1223
1224 for action in actions {
1225 let active_count = {
1227 let active_optimizations = self.active_optimizations.read().map_err(|e| {
1228 TorshDistributedError::communication_error(
1229 "active_optimizations",
1230 format!("Lock error: {}", e),
1231 )
1232 })?;
1233 active_optimizations.len()
1234 };
1235
1236 if active_count >= self.config.max_concurrent_optimizations {
1237 break; }
1239
1240 {
1242 let mut active_optimizations = self.active_optimizations.write().map_err(|e| {
1243 TorshDistributedError::communication_error(
1244 "active_optimizations",
1245 format!("Lock error: {}", e),
1246 )
1247 })?;
1248 active_optimizations.insert(action.id.clone(), action.clone());
1249 }
1250
1251 self.execute_optimization(&action.id)?;
1253 scheduled_count += 1;
1254 }
1255
1256 info!(
1257 "Scheduled {} memory optimizations for execution",
1258 scheduled_count
1259 );
1260 Ok(scheduled_count)
1261 }
1262
1263 pub fn get_optimization_status(&self) -> TorshResult<MemoryOptimizationStatus> {
1265 let node_stats = self.node_memory_stats.read().map_err(|e| {
1266 TorshDistributedError::communication_error("node_stats", format!("Lock error: {}", e))
1267 })?;
1268
1269 let active_optimizations = self.active_optimizations.read().map_err(|e| {
1270 TorshDistributedError::communication_error(
1271 "active_optimizations",
1272 format!("Lock error: {}", e),
1273 )
1274 })?;
1275
1276 let total_nodes = node_stats.len();
1277 let high_pressure_nodes = node_stats
1278 .values()
1279 .filter(|stats| stats.pressure_score >= self.config.pressure_threshold)
1280 .count();
1281
1282 let total_memory_mb = node_stats.values().map(|s| s.total_memory_mb).sum();
1283 let allocated_memory_mb = node_stats.values().map(|s| s.allocated_memory_mb).sum();
1284 let avg_utilization = if total_memory_mb > 0 {
1285 (allocated_memory_mb as f32 / total_memory_mb as f32) * 100.0
1286 } else {
1287 0.0
1288 };
1289
1290 let avg_pressure_score = if total_nodes > 0 {
1291 node_stats.values().map(|s| s.pressure_score).sum::<f32>() / total_nodes as f32
1292 } else {
1293 0.0
1294 };
1295
1296 Ok(MemoryOptimizationStatus {
1297 total_nodes,
1298 high_pressure_nodes,
1299 active_optimizations: active_optimizations.len(),
1300 avg_memory_utilization: avg_utilization,
1301 avg_pressure_score,
1302 total_memory_mb,
1303 allocated_memory_mb,
1304 optimization_efficiency: self.calculate_optimization_efficiency()?,
1305 timestamp_ms: SystemTime::now()
1306 .duration_since(UNIX_EPOCH)
1307 .expect("system time should be after UNIX_EPOCH")
1308 .as_millis() as u64,
1309 })
1310 }
1311
1312 fn calculate_optimization_efficiency(&self) -> TorshResult<f32> {
1314 let history = self.optimization_history.lock().map_err(|e| {
1315 TorshDistributedError::communication_error(
1316 "optimization_history",
1317 format!("Lock error: {}", e),
1318 )
1319 })?;
1320
1321 if history.is_empty() {
1322 return Ok(0.0);
1323 }
1324
1325 let completed_optimizations: Vec<_> = history
1326 .iter()
1327 .filter(|action| matches!(action.status, OptimizationStatus::Completed { .. }))
1328 .collect();
1329
1330 if completed_optimizations.is_empty() {
1331 return Ok(0.0);
1332 }
1333
1334 let total_expected: u64 = completed_optimizations
1335 .iter()
1336 .map(|action| action.expected_savings_mb)
1337 .sum();
1338
1339 let total_actual: u64 = completed_optimizations
1340 .iter()
1341 .filter_map(|action| {
1342 if let OptimizationStatus::Completed {
1343 actual_savings_mb, ..
1344 } = action.status
1345 {
1346 Some(actual_savings_mb)
1347 } else {
1348 None
1349 }
1350 })
1351 .sum();
1352
1353 if total_expected > 0 {
1354 Ok((total_actual as f32 / total_expected as f32).min(1.0))
1355 } else {
1356 Ok(0.0)
1357 }
1358 }
1359
1360 pub fn track_allocation(
1362 &self,
1363 node_id: String,
1364 size_mb: u64,
1365 allocation_type: String,
1366 success: bool,
1367 ) -> TorshResult<()> {
1368 let mut tracker = self.allocation_tracker.lock().map_err(|e| {
1369 TorshDistributedError::communication_error(
1370 "allocation_tracker",
1371 format!("Lock error: {}", e),
1372 )
1373 })?;
1374
1375 tracker.track_allocation(&node_id, size_mb, allocation_type, success);
1376 Ok(())
1377 }
1378
1379 pub fn get_allocation_prediction(&self, node_id: &str, minutes_ahead: u32) -> TorshResult<u64> {
1381 let tracker = self.allocation_tracker.lock().map_err(|e| {
1382 TorshDistributedError::communication_error(
1383 "allocation_tracker",
1384 format!("Lock error: {}", e),
1385 )
1386 })?;
1387
1388 Ok(tracker.get_allocation_prediction(node_id, minutes_ahead))
1389 }
1390
1391 pub fn export_optimization_data(&self) -> TorshResult<MemoryOptimizationExport> {
1393 let status = self.get_optimization_status()?;
1394
1395 let node_stats = self.node_memory_stats.read().map_err(|e| {
1396 TorshDistributedError::communication_error("node_stats", format!("Lock error: {}", e))
1397 })?;
1398
1399 let active_optimizations = self.active_optimizations.read().map_err(|e| {
1400 TorshDistributedError::communication_error(
1401 "active_optimizations",
1402 format!("Lock error: {}", e),
1403 )
1404 })?;
1405
1406 let optimization_history = self.optimization_history.lock().map_err(|e| {
1407 TorshDistributedError::communication_error(
1408 "optimization_history",
1409 format!("Lock error: {}", e),
1410 )
1411 })?;
1412
1413 Ok(MemoryOptimizationExport {
1414 status,
1415 node_memory_stats: node_stats.clone(),
1416 active_optimizations: active_optimizations.values().cloned().collect(),
1417 optimization_history: optimization_history.iter().cloned().collect(),
1418 config: self.config.clone(),
1419 export_timestamp_ms: SystemTime::now()
1420 .duration_since(UNIX_EPOCH)
1421 .expect("system time should be after UNIX_EPOCH")
1422 .as_millis() as u64,
1423 })
1424 }
1425}
1426
1427#[derive(Debug, Clone, Serialize, Deserialize)]
1429pub struct MemoryOptimizationStatus {
1430 pub total_nodes: usize,
1431 pub high_pressure_nodes: usize,
1432 pub active_optimizations: usize,
1433 pub avg_memory_utilization: f32,
1434 pub avg_pressure_score: f32,
1435 pub total_memory_mb: u64,
1436 pub allocated_memory_mb: u64,
1437 pub optimization_efficiency: f32,
1438 pub timestamp_ms: u64,
1439}
1440
1441#[derive(Debug, Clone, Serialize, Deserialize)]
1443pub struct MemoryOptimizationExport {
1444 pub status: MemoryOptimizationStatus,
1445 pub node_memory_stats: HashMap<String, NodeMemoryStats>,
1446 pub active_optimizations: Vec<MemoryOptimizationAction>,
1447 pub optimization_history: Vec<MemoryOptimizationAction>,
1448 pub config: MemoryOptimizationConfig,
1449 pub export_timestamp_ms: u64,
1450}
1451
1452#[cfg(test)]
1453mod tests {
1454 use super::*;
1455 use crate::distributed_monitoring::{DistributedMonitor, MonitoringConfig};
1456
1457 #[tokio::test]
1458 async fn test_memory_optimizer_creation() -> TorshResult<()> {
1459 let monitor_config = MonitoringConfig::default();
1460 let monitor = Arc::new(DistributedMonitor::new(monitor_config, false));
1461
1462 let config = MemoryOptimizationConfig::default();
1463 let optimizer = DistributedMemoryOptimizer::new(config, monitor);
1464
1465 let status = optimizer.get_optimization_status()?;
1466 assert_eq!(status.total_nodes, 0);
1467
1468 Ok(())
1469 }
1470
1471 #[tokio::test]
1472 async fn test_linear_predictor() -> TorshResult<()> {
1473 let mut predictor = LinearPredictor::new();
1474
1475 for i in 0..30 {
1477 predictor.add_data_point(i as f32, 50.0 + i as f32 * 2.0);
1478 }
1479
1480 let predicted = predictor.predict(35.0);
1482 assert!(
1485 predicted > 0.0,
1486 "Prediction should be positive, got {}",
1487 predicted
1488 );
1489
1490 Ok(())
1491 }
1492
1493 #[tokio::test]
1494 async fn test_allocation_tracker() -> TorshResult<()> {
1495 let mut tracker = AllocationTracker::new();
1496
1497 for i in 0..20 {
1499 tracker.track_allocation("node1", 100 + i * 10, "model".to_string(), true);
1500 }
1501
1502 let prediction = tracker.get_allocation_prediction("node1", 5);
1503 assert!(prediction > 0); Ok(())
1506 }
1507
1508 #[tokio::test]
1509 async fn test_memory_balancer() -> TorshResult<()> {
1510 let mut balancer = MemoryBalancer::new(20.0);
1511
1512 let mut node_stats = HashMap::new();
1513 node_stats.insert(
1514 "node1".to_string(),
1515 NodeMemoryStats {
1516 node_id: "node1".to_string(),
1517 total_memory_mb: 16000,
1518 allocated_memory_mb: 14000,
1519 peak_memory_mb: 14000,
1520 free_memory_mb: 2000,
1521 utilization_percent: 87.5,
1522 pressure_score: 0.8,
1523 fragmentation: 0.1,
1524 allocation_failures: 0,
1525 allocation_rate_mbps: 10.0,
1526 deallocation_rate_mbps: 8.0,
1527 timestamp_ms: SystemTime::now()
1528 .duration_since(UNIX_EPOCH)
1529 .unwrap()
1530 .as_millis() as u64,
1531 },
1532 );
1533
1534 node_stats.insert(
1535 "node2".to_string(),
1536 NodeMemoryStats {
1537 node_id: "node2".to_string(),
1538 total_memory_mb: 16000,
1539 allocated_memory_mb: 8000,
1540 peak_memory_mb: 8000,
1541 free_memory_mb: 8000,
1542 utilization_percent: 50.0,
1543 pressure_score: 0.2,
1544 fragmentation: 0.05,
1545 allocation_failures: 0,
1546 allocation_rate_mbps: 5.0,
1547 deallocation_rate_mbps: 4.0,
1548 timestamp_ms: SystemTime::now()
1549 .duration_since(UNIX_EPOCH)
1550 .unwrap()
1551 .as_millis() as u64,
1552 },
1553 );
1554
1555 let actions = balancer.check_and_balance(&node_stats);
1556 assert!(actions.is_empty() || !actions.is_empty()); Ok(())
1562 }
1563}