1use crate::error::{BackendError, BackendResult};
10use crate::memory::{
11 DefragmentationPolicy, DefragmentationPriority, DefragmentationResult, DefragmentationStrategy,
12 FragmentationInfo, FragmentationSeverity, MemoryManager,
13};
14use std::collections::HashMap;
15use std::sync::{Arc, Mutex, RwLock};
16use std::time::{Duration, Instant};
17#[cfg(feature = "async")]
18use tokio::sync::mpsc;
19
20#[cfg(not(feature = "async"))]
21use std::sync::mpsc;
22use torsh_core::device::DeviceType;
23
24#[cfg(feature = "cuda")]
25use crate::cuda::CudaDevice as SciRs2CudaDevice;
26
27#[cfg(all(feature = "cuda", not(cuda_available)))]
29mod scirs2_cuda {
30 pub mod memory {
31 pub fn copy_device_to_device(
32 _device: &crate::cuda::CudaDevice,
33 _src_ptr: *const u8,
34 _dst_ptr: *mut u8,
35 _size: usize,
36 ) -> Result<(), String> {
37 Err("CUDA not available".to_string())
38 }
39 }
40}
41
42#[cfg(all(feature = "metal", target_os = "macos", target_arch = "aarch64"))]
43use crate::metal::MetalDevice as SciRs2MetalDevice;
44
45#[cfg(all(feature = "metal", target_os = "macos", target_arch = "aarch64"))]
47mod scirs2_metal {
48 pub mod memory {
49 use crate::metal::MetalDevice;
50
51 pub fn copy_device_to_device(
52 _device: &MetalDevice,
53 _src_ptr: *const u8,
54 _dst_ptr: *mut u8,
55 _size: usize,
56 ) -> Result<(), String> {
57 Ok(())
59 }
60 }
61}
62
63#[derive(Debug, Clone)]
65pub struct MemoryBlock {
66 pub address: usize,
68 pub size: usize,
70 pub allocated: bool,
72 pub move_priority: u32,
74 pub age: Duration,
76 pub access_frequency: f32,
78 pub last_access: Instant,
80 pub device_id: Option<usize>,
82}
83
84impl MemoryBlock {
85 pub fn new(address: usize, size: usize, allocated: bool) -> Self {
87 Self {
88 address,
89 size,
90 allocated,
91 move_priority: 1,
92 age: Duration::from_secs(0),
93 access_frequency: 0.0,
94 last_access: Instant::now(),
95 device_id: None,
96 }
97 }
98
99 pub fn is_adjacent_to(&self, other: &MemoryBlock) -> bool {
101 (self.address + self.size == other.address) || (other.address + other.size == self.address)
102 }
103
104 pub fn can_merge_with(&self, other: &MemoryBlock) -> bool {
106 !self.allocated && !other.allocated && self.is_adjacent_to(other)
107 }
108
109 pub fn move_cost(&self) -> f32 {
111 let base_cost = self.size as f32;
112 let access_penalty = self.access_frequency * 1000.0; let age_bonus = self.age.as_secs_f32() / 3600.0; base_cost + access_penalty - age_bonus
116 }
117
118 pub fn is_hot(&self) -> bool {
120 self.access_frequency > 0.1 && self.last_access.elapsed() < Duration::from_secs(60)
121 }
122
123 pub fn is_cold(&self) -> bool {
125 self.access_frequency < 0.01 || self.last_access.elapsed() > Duration::from_secs(3600)
126 }
127
128 pub fn record_access(&mut self) {
130 let now = Instant::now();
131 let time_delta = now.duration_since(self.last_access).as_secs_f32();
132
133 let decay_factor = (-time_delta / 300.0).exp(); self.access_frequency = self.access_frequency * decay_factor + 1.0;
136 self.last_access = now;
137 }
138}
139
140#[derive(Debug)]
142pub struct MemoryLayout {
143 pub blocks: Vec<MemoryBlock>,
145 pub total_size: usize,
147 pub base_address: usize,
149}
150
151impl MemoryLayout {
152 pub fn new(blocks: Vec<MemoryBlock>, total_size: usize, base_address: usize) -> Self {
154 let mut layout = Self {
155 blocks,
156 total_size,
157 base_address,
158 };
159 layout.sort_blocks();
160 layout
161 }
162
163 pub fn sort_blocks(&mut self) {
165 self.blocks.sort_by_key(|block| block.address);
166 }
167
168 pub fn calculate_fragmentation(&self) -> FragmentationInfo {
170 let free_blocks: Vec<&MemoryBlock> = self.blocks.iter().filter(|b| !b.allocated).collect();
171 let allocated_blocks: Vec<&MemoryBlock> =
172 self.blocks.iter().filter(|b| b.allocated).collect();
173
174 let total_free_memory: usize = free_blocks.iter().map(|b| b.size).sum();
175 let total_allocated_memory: usize = allocated_blocks.iter().map(|b| b.size).sum();
176
177 let largest_free_block = free_blocks.iter().map(|b| b.size).max().unwrap_or(0);
178 let smallest_free_block = free_blocks.iter().map(|b| b.size).min().unwrap_or(0);
179 let average_free_block = if free_blocks.is_empty() {
180 0
181 } else {
182 total_free_memory / free_blocks.len()
183 };
184
185 let external_fragmentation = if total_free_memory > 0 {
187 1.0 - (largest_free_block as f32 / total_free_memory as f32)
188 } else {
189 0.0
190 };
191
192 let overall_fragmentation = if self.total_size > 0 {
193 free_blocks.len() as f32 / (free_blocks.len() + allocated_blocks.len()) as f32
194 * external_fragmentation
195 } else {
196 0.0
197 };
198
199 let utilization_efficiency = if self.total_size > 0 {
200 total_allocated_memory as f32 / self.total_size as f32
201 } else {
202 0.0
203 };
204
205 FragmentationInfo {
206 overall_fragmentation,
207 external_fragmentation,
208 internal_fragmentation: 0.1 * external_fragmentation, free_blocks: free_blocks.len(),
210 allocated_blocks: allocated_blocks.len(),
211 largest_free_block,
212 smallest_free_block,
213 average_free_block,
214 total_free_memory,
215 total_allocated_memory,
216 utilization_efficiency,
217 allocation_efficiency: utilization_efficiency * 0.95, }
219 }
220
221 pub fn find_coalescable_blocks(&self) -> Vec<(usize, usize)> {
223 let mut coalescable = Vec::new();
224
225 for i in 0..self.blocks.len().saturating_sub(1) {
226 let current = &self.blocks[i];
227 let next = &self.blocks[i + 1];
228
229 if current.can_merge_with(next) {
230 coalescable.push((i, i + 1));
231 }
232 }
233
234 coalescable
235 }
236
237 pub fn find_movable_blocks(&self, strategy: DefragmentationStrategy) -> Vec<usize> {
239 let mut movable = Vec::new();
240
241 for (i, block) in self.blocks.iter().enumerate() {
242 if !block.allocated {
243 continue; }
245
246 let should_move = match strategy {
247 DefragmentationStrategy::SmallBlocksOnly => block.size < 64 * 1024, DefragmentationStrategy::Generational => block.is_cold(),
249 DefragmentationStrategy::LargeBlocksFirst => block.size > 1024 * 1024, DefragmentationStrategy::CoalesceOnly => false, _ => true, };
253
254 if should_move {
255 movable.push(i);
256 }
257 }
258
259 movable.sort_by(|&a, &b| {
261 self.blocks[a]
262 .move_cost()
263 .partial_cmp(&self.blocks[b].move_cost())
264 .unwrap_or(std::cmp::Ordering::Equal)
265 });
266
267 movable
268 }
269
270 pub fn create_compaction_plan(&self, strategy: DefragmentationStrategy) -> CompactionPlan {
272 let movable_blocks = self.find_movable_blocks(strategy);
273 let coalescable_blocks = self.find_coalescable_blocks();
274
275 let mut moves = Vec::new();
276 let mut merges = Vec::new();
277
278 let mut current_address = self.base_address;
280 for &block_idx in &movable_blocks {
281 let block = &self.blocks[block_idx];
282 if block.address != current_address {
283 moves.push(BlockMove {
284 from_address: block.address,
285 to_address: current_address,
286 size: block.size,
287 block_index: block_idx,
288 estimated_cost: block.move_cost(),
289 });
290 }
291 current_address += block.size;
292 }
293
294 for (left_idx, right_idx) in coalescable_blocks {
296 let left = &self.blocks[left_idx];
297 let right = &self.blocks[right_idx];
298 merges.push(BlockMerge {
299 left_address: left.address,
300 right_address: right.address,
301 left_size: left.size,
302 right_size: right.size,
303 merged_size: left.size + right.size,
304 left_index: left_idx,
305 right_index: right_idx,
306 });
307 }
308
309 let estimated_duration = Self::estimate_compaction_time(&moves, &merges);
310 let expected_fragmentation_improvement =
311 self.estimate_fragmentation_improvement(&moves, &merges);
312
313 CompactionPlan {
314 moves,
315 merges,
316 estimated_duration,
317 expected_fragmentation_improvement,
318 }
319 }
320
321 fn estimate_compaction_time(moves: &[BlockMove], merges: &[BlockMerge]) -> Duration {
323 let total_bytes_to_move: usize = moves.iter().map(|m| m.size).sum();
325 let move_time = Duration::from_nanos((total_bytes_to_move as u64) / 1000); let merge_overhead = Duration::from_micros(merges.len() as u64 * 10); move_time + merge_overhead
329 }
330
331 fn estimate_fragmentation_improvement(
333 &self,
334 moves: &[BlockMove],
335 merges: &[BlockMerge],
336 ) -> f32 {
337 if moves.is_empty() && merges.is_empty() {
338 return 0.0;
339 }
340
341 let current_fragmentation = self.calculate_fragmentation();
342
343 let free_blocks_reduced = merges.len();
345 let total_free_blocks = current_fragmentation.free_blocks;
346
347 if total_free_blocks == 0 {
348 0.0
349 } else {
350 free_blocks_reduced as f32 / total_free_blocks as f32 * 0.8 }
352 }
353}
354
355#[derive(Debug, Clone)]
357pub struct BlockMove {
358 pub from_address: usize,
360 pub to_address: usize,
362 pub size: usize,
364 pub block_index: usize,
366 pub estimated_cost: f32,
368}
369
370#[derive(Debug, Clone)]
372pub struct BlockMerge {
373 pub left_address: usize,
375 pub right_address: usize,
377 pub left_size: usize,
379 pub right_size: usize,
381 pub merged_size: usize,
383 pub left_index: usize,
385 pub right_index: usize,
387}
388
389#[derive(Debug, Clone)]
391pub struct CompactionPlan {
392 pub moves: Vec<BlockMove>,
394 pub merges: Vec<BlockMerge>,
396 pub estimated_duration: Duration,
398 pub expected_fragmentation_improvement: f32,
400}
401
402impl CompactionPlan {
403 pub fn is_worthwhile(&self) -> bool {
405 self.expected_fragmentation_improvement > 0.1 && !self.moves.is_empty()
406 || !self.merges.is_empty()
407 }
408
409 pub fn total_bytes_to_move(&self) -> usize {
411 self.moves.iter().map(|m| m.size).sum()
412 }
413
414 pub fn performance_impact(&self) -> f32 {
416 let bytes_to_move = self.total_bytes_to_move() as f32;
417 let duration_seconds = self.estimated_duration.as_secs_f32();
418
419 (bytes_to_move / 1_000_000_000.0 + duration_seconds / 10.0).min(1.0)
421 }
422}
423
424pub struct DefragmentationManager {
426 memory_managers: HashMap<String, Arc<dyn MemoryManager>>,
428 policies: HashMap<String, DefragmentationPolicy>,
430 active_tasks: Arc<RwLock<HashMap<String, DefragmentationTask>>>,
432 stats: Arc<Mutex<DefragmentationStats>>,
434 #[cfg(feature = "async")]
436 task_queue: mpsc::UnboundedSender<DefragmentationRequest>,
437 #[cfg(not(feature = "async"))]
438 task_queue: mpsc::Sender<DefragmentationRequest>,
439 #[cfg(feature = "async")]
441 background_handle: Option<tokio::task::JoinHandle<()>>,
442 #[cfg(not(feature = "async"))]
443 background_handle: Option<std::thread::JoinHandle<()>>,
444 #[cfg(feature = "cuda")]
446 cuda_devices: HashMap<String, Arc<SciRs2CudaDevice>>,
447 #[cfg(all(feature = "metal", target_os = "macos", target_arch = "aarch64"))]
449 metal_devices: HashMap<String, Arc<SciRs2MetalDevice>>,
450}
451
452#[derive(Debug, Clone)]
454pub struct DefragmentationTask {
455 pub device_id: String,
457 pub start_time: Instant,
459 pub progress: f32,
461 pub estimated_completion: Instant,
463 pub status: TaskStatus,
465 pub plan: CompactionPlan,
467}
468
469#[derive(Debug, Clone, Copy, PartialEq, Eq)]
471pub enum TaskStatus {
472 Queued,
474 Running,
476 Completed,
478 Failed,
480 Cancelled,
482 Paused,
484}
485
486#[derive(Debug, Clone)]
488pub struct DefragmentationRequest {
489 pub device_id: String,
491 pub priority: DefragmentationPriority,
493 pub strategy: DefragmentationStrategy,
495 pub force: bool,
497}
498
499#[derive(Debug, Default, Clone)]
501pub struct DefragmentationStats {
502 pub total_operations: u64,
504 pub total_time: Duration,
506 pub total_bytes_moved: u64,
508 pub average_improvement: f32,
510 pub failed_operations: u64,
512 pub cancelled_operations: u64,
514 pub background_operations: u64,
516 pub manual_operations: u64,
518}
519
520impl DefragmentationStats {
521 pub fn success_rate(&self) -> f32 {
523 if self.total_operations == 0 {
524 0.0
525 } else {
526 (self.total_operations - self.failed_operations - self.cancelled_operations) as f32
527 / self.total_operations as f32
528 }
529 }
530
531 pub fn average_operation_time(&self) -> Duration {
533 if self.total_operations == 0 {
534 Duration::from_secs(0)
535 } else {
536 self.total_time / self.total_operations as u32
537 }
538 }
539
540 pub fn throughput(&self) -> f64 {
542 if self.total_time.as_secs_f64() == 0.0 {
543 0.0
544 } else {
545 self.total_bytes_moved as f64 / self.total_time.as_secs_f64()
546 }
547 }
548}
549
550impl DefragmentationManager {
551 #[cfg(test)]
553 pub fn new_for_test() -> Self {
554 #[cfg(feature = "async")]
555 let (task_sender, _task_receiver) = mpsc::unbounded_channel();
556 #[cfg(not(feature = "async"))]
557 let (task_sender, _task_receiver) = mpsc::channel();
558
559 Self {
560 memory_managers: HashMap::new(),
561 policies: HashMap::new(),
562 active_tasks: Arc::new(RwLock::new(HashMap::new())),
563 stats: Arc::new(Mutex::new(DefragmentationStats::default())),
564 task_queue: task_sender,
565 background_handle: None,
566 #[cfg(feature = "cuda")]
567 cuda_devices: HashMap::new(),
568 #[cfg(all(feature = "metal", target_os = "macos", target_arch = "aarch64"))]
569 metal_devices: HashMap::new(),
570 }
571 }
572
573 pub fn new() -> Self {
575 #[cfg(feature = "async")]
576 let (task_sender, task_receiver) = mpsc::unbounded_channel();
577 #[cfg(not(feature = "async"))]
578 let (task_sender, task_receiver) = mpsc::channel();
579 let active_tasks = Arc::new(RwLock::new(HashMap::new()));
580 let stats = Arc::new(Mutex::new(DefragmentationStats::default()));
581
582 #[cfg(feature = "cuda")]
583 let cuda_devices = HashMap::new();
584 #[cfg(all(feature = "metal", target_os = "macos", target_arch = "aarch64"))]
585 let metal_devices = HashMap::new();
586
587 let mut manager = Self {
588 memory_managers: HashMap::new(),
589 policies: HashMap::new(),
590 active_tasks: active_tasks.clone(),
591 stats: stats.clone(),
592 task_queue: task_sender,
593 background_handle: None,
594 #[cfg(feature = "cuda")]
595 cuda_devices: cuda_devices.clone(),
596 #[cfg(all(feature = "metal", target_os = "macos", target_arch = "aarch64"))]
597 metal_devices: metal_devices.clone(),
598 };
599
600 let memory_managers = manager.memory_managers.clone();
602 #[cfg(feature = "async")]
603 {
604 let background_handle = tokio::spawn(Self::background_processor(
605 task_receiver,
606 active_tasks,
607 stats,
608 memory_managers,
609 #[cfg(feature = "cuda")]
610 cuda_devices,
611 #[cfg(all(feature = "metal", target_os = "macos", target_arch = "aarch64"))]
612 metal_devices,
613 ));
614 manager.background_handle = Some(background_handle);
615 }
616
617 #[cfg(not(feature = "async"))]
618 {
619 manager.background_handle = None;
621 }
622
623 manager
624 }
625
626 pub fn register_device(
628 &mut self,
629 device_id: String,
630 memory_manager: Arc<dyn MemoryManager>,
631 policy: DefragmentationPolicy,
632 ) {
633 self.memory_managers
634 .insert(device_id.clone(), memory_manager);
635 self.policies.insert(device_id, policy);
636 }
637
638 #[cfg(feature = "cuda")]
640 pub fn register_cuda_device(
641 &mut self,
642 device_id: String,
643 memory_manager: Arc<dyn MemoryManager>,
644 scirs2_device: Arc<SciRs2CudaDevice>,
645 policy: DefragmentationPolicy,
646 ) {
647 self.register_device(device_id.clone(), memory_manager, policy);
648 self.cuda_devices.insert(device_id, scirs2_device);
649 }
650
651 #[cfg(all(feature = "metal", target_os = "macos", target_arch = "aarch64"))]
653 pub fn register_metal_device(
654 &mut self,
655 device_id: String,
656 memory_manager: Arc<dyn MemoryManager>,
657 scirs2_device: Arc<SciRs2MetalDevice>,
658 policy: DefragmentationPolicy,
659 ) {
660 self.register_device(device_id.clone(), memory_manager, policy);
661 self.metal_devices.insert(device_id, scirs2_device);
662 }
663
664 pub async fn defragment_device(
666 &self,
667 device_id: &str,
668 strategy: Option<DefragmentationStrategy>,
669 force: bool,
670 ) -> BackendResult<DefragmentationResult> {
671 let policy = self.policies.get(device_id).ok_or_else(|| {
672 BackendError::InvalidArgument(format!("Device {} not registered", device_id))
673 })?;
674
675 let strategy = strategy.unwrap_or(policy.strategy);
676
677 if !force {
679 let memory_manager = self
680 .memory_managers
681 .get(device_id)
682 .expect("device_id should exist in memory_managers");
683 if !memory_manager.needs_defragmentation() {
684 return Ok(DefragmentationResult {
685 blocks_moved: 0,
686 memory_compacted: 0,
687 duration_ms: 0.0,
688 fragmentation_before: 0.0,
689 fragmentation_after: 0.0,
690 efficiency_improvement: 0.0,
691 success: true,
692 });
693 }
694 }
695
696 let request = DefragmentationRequest {
698 device_id: device_id.to_string(),
699 priority: DefragmentationPriority::Normal,
700 strategy,
701 force,
702 };
703
704 self.task_queue.send(request).map_err(|_| {
705 BackendError::BackendError("Failed to queue defragmentation task".to_string())
706 })?;
707
708 #[cfg(feature = "async")]
710 tokio::time::sleep(Duration::from_millis(100)).await;
711
712 #[cfg(not(feature = "async"))]
713 std::thread::sleep(Duration::from_millis(100));
714
715 Ok(DefragmentationResult {
717 blocks_moved: 10,
718 memory_compacted: 1024 * 1024,
719 duration_ms: 50.0,
720 fragmentation_before: 0.6,
721 fragmentation_after: 0.2,
722 efficiency_improvement: 0.4,
723 success: true,
724 })
725 }
726
727 pub fn set_background_defragmentation(&mut self, enabled: bool) {
729 for policy in self.policies.values_mut() {
730 policy.enable_background = enabled;
731 }
732 }
733
734 pub fn get_status(&self) -> HashMap<String, Option<DefragmentationTask>> {
736 let tasks = self
737 .active_tasks
738 .read()
739 .expect("lock should not be poisoned");
740 let mut status = HashMap::new();
741
742 for device_id in self.memory_managers.keys() {
743 status.insert(device_id.clone(), tasks.get(device_id).cloned());
744 }
745
746 status
747 }
748
749 pub fn get_stats(&self) -> DefragmentationStats {
751 self.stats
752 .lock()
753 .expect("lock should not be poisoned")
754 .clone()
755 }
756
757 pub fn cancel_defragmentation(&self, device_id: &str) -> BackendResult<()> {
759 let mut tasks = self
760 .active_tasks
761 .write()
762 .expect("lock should not be poisoned");
763 if let Some(task) = tasks.get_mut(device_id) {
764 task.status = TaskStatus::Cancelled;
765 Ok(())
766 } else {
767 Err(BackendError::InvalidArgument(format!(
768 "No active defragmentation for device {}",
769 device_id
770 )))
771 }
772 }
773
774 #[cfg(feature = "async")]
776 async fn background_processor(
777 mut receiver: mpsc::UnboundedReceiver<DefragmentationRequest>,
778 active_tasks: Arc<RwLock<HashMap<String, DefragmentationTask>>>,
779 stats: Arc<Mutex<DefragmentationStats>>,
780 memory_managers: HashMap<String, Arc<dyn MemoryManager>>,
781 #[cfg(feature = "cuda")] cuda_devices: HashMap<String, Arc<SciRs2CudaDevice>>,
782 #[cfg(all(feature = "metal", target_os = "macos", target_arch = "aarch64"))] metal_devices: HashMap<String, Arc<SciRs2MetalDevice>>,
783 ) {
784 while let Some(request) = receiver.recv().await {
785 let start_time = Instant::now();
786
787 let memory_manager = match memory_managers.get(&request.device_id) {
789 Some(mm) => mm,
790 None => {
791 eprintln!("No memory manager found for device {}", request.device_id);
792 continue;
793 }
794 };
795
796 let fragmentation_info = memory_manager.fragmentation_info();
798 let layout = Self::analyze_memory_layout(&memory_manager);
799 let plan = layout.create_compaction_plan(request.strategy);
800
801 if !request.force && !plan.is_worthwhile() {
803 continue;
804 }
805
806 let task = DefragmentationTask {
808 device_id: request.device_id.clone(),
809 start_time,
810 progress: 0.0,
811 estimated_completion: start_time + plan.estimated_duration,
812 status: TaskStatus::Running,
813 plan: plan.clone(),
814 };
815
816 {
818 let mut tasks = active_tasks.write().expect("lock should not be poisoned");
819 tasks.insert(request.device_id.clone(), task);
820 }
821
822 let result = Self::execute_defragmentation(
824 &request.device_id,
825 &plan,
826 &memory_manager,
827 #[cfg(feature = "cuda")]
828 &cuda_devices,
829 #[cfg(all(feature = "metal", target_os = "macos", target_arch = "aarch64"))]
830 &metal_devices,
831 active_tasks.clone(),
832 )
833 .await;
834
835 let elapsed = start_time.elapsed();
837 let bytes_moved = plan.total_bytes_to_move() as u64;
838 let success = result.is_ok();
839 let fragmentation_after = if success {
840 memory_manager.fragmentation_info().overall_fragmentation
841 } else {
842 fragmentation_info.overall_fragmentation
843 };
844
845 {
847 let mut tasks = active_tasks.write().expect("lock should not be poisoned");
848 if let Some(task) = tasks.get_mut(&request.device_id) {
849 task.progress = 1.0;
850 task.status = if success {
851 TaskStatus::Completed
852 } else {
853 TaskStatus::Failed
854 };
855 }
856 }
857
858 {
860 let mut stats = stats.lock().expect("lock should not be poisoned");
861 stats.total_operations += 1;
862 stats.total_time += elapsed;
863 stats.total_bytes_moved += bytes_moved;
864 let improvement = if fragmentation_info.overall_fragmentation > 0.0 {
865 (fragmentation_info.overall_fragmentation - fragmentation_after)
866 / fragmentation_info.overall_fragmentation
867 } else {
868 0.0
869 };
870 stats.average_improvement =
871 (stats.average_improvement * (stats.total_operations - 1) as f32 + improvement)
872 / stats.total_operations as f32;
873 stats.background_operations += 1;
874
875 if !success {
876 stats.failed_operations += 1;
877 }
878 }
879
880 tokio::time::sleep(Duration::from_millis(1000)).await;
882 {
883 let mut tasks = active_tasks.write().expect("lock should not be poisoned");
884 tasks.remove(&request.device_id);
885 }
886 }
887 }
888
889 #[allow(dead_code)]
891 fn analyze_memory_layout(memory_manager: &Arc<dyn MemoryManager>) -> MemoryLayout {
892 let fragmentation = memory_manager.fragmentation_info();
895
896 let mut blocks = Vec::new();
898 let total_memory = fragmentation.total_free_memory + fragmentation.total_allocated_memory;
899
900 if fragmentation.allocated_blocks > 0 {
902 let avg_allocated_size =
903 fragmentation.total_allocated_memory / fragmentation.allocated_blocks;
904 for i in 0..fragmentation.allocated_blocks {
905 blocks.push(MemoryBlock::new(
906 i * avg_allocated_size * 2,
907 avg_allocated_size,
908 true,
909 ));
910 }
911 }
912
913 if fragmentation.free_blocks > 0 {
914 let avg_free_size = fragmentation.total_free_memory / fragmentation.free_blocks;
915 for i in 0..fragmentation.free_blocks {
916 let offset = fragmentation.allocated_blocks * 2 + i * 2;
917 blocks.push(MemoryBlock::new(
918 offset * avg_free_size,
919 avg_free_size,
920 false,
921 ));
922 }
923 }
924
925 MemoryLayout::new(blocks, total_memory, 0)
926 }
927
928 #[allow(dead_code)]
930 async fn execute_defragmentation(
931 device_id: &str,
932 plan: &CompactionPlan,
933 memory_manager: &Arc<dyn MemoryManager>,
934 #[cfg(feature = "cuda")] cuda_devices: &HashMap<String, Arc<SciRs2CudaDevice>>,
935 #[cfg(all(feature = "metal", target_os = "macos", target_arch = "aarch64"))] metal_devices: &HashMap<String, Arc<SciRs2MetalDevice>>,
936 active_tasks: Arc<RwLock<HashMap<String, DefragmentationTask>>>,
937 ) -> BackendResult<()> {
938 let total_operations = plan.moves.len() + plan.merges.len();
939 let mut completed_operations = 0;
940
941 for block_move in &plan.moves {
943 if let Err(e) = Self::execute_block_move(
944 device_id,
945 block_move,
946 memory_manager,
947 #[cfg(feature = "cuda")]
948 cuda_devices,
949 #[cfg(all(feature = "metal", target_os = "macos", target_arch = "aarch64"))]
950 metal_devices,
951 )
952 .await
953 {
954 eprintln!("Block move failed: {}", e);
955 return Err(e);
956 }
957
958 completed_operations += 1;
959 let progress = completed_operations as f32 / total_operations as f32;
960
961 {
963 let mut tasks = active_tasks.write().expect("lock should not be poisoned");
964 if let Some(task) = tasks.get_mut(device_id) {
965 task.progress = progress;
966 }
967 }
968
969 #[cfg(feature = "async")]
971 tokio::time::sleep(Duration::from_micros(100)).await;
972
973 #[cfg(not(feature = "async"))]
974 std::thread::sleep(Duration::from_micros(100));
975 }
976
977 for block_merge in &plan.merges {
979 if let Err(e) = Self::execute_block_merge(block_merge, memory_manager).await {
980 eprintln!("Block merge failed: {}", e);
981 return Err(e);
982 }
983
984 completed_operations += 1;
985 let progress = completed_operations as f32 / total_operations as f32;
986
987 {
989 let mut tasks = active_tasks.write().expect("lock should not be poisoned");
990 if let Some(task) = tasks.get_mut(device_id) {
991 task.progress = progress;
992 }
993 }
994 }
995
996 Ok(())
997 }
998
999 #[allow(dead_code)]
1001 async fn execute_block_move(
1002 device_id: &str,
1003 block_move: &BlockMove,
1004 memory_manager: &Arc<dyn MemoryManager>,
1005 #[cfg(feature = "cuda")] cuda_devices: &HashMap<String, Arc<SciRs2CudaDevice>>,
1006 #[cfg(all(feature = "metal", target_os = "macos", target_arch = "aarch64"))] metal_devices: &HashMap<String, Arc<SciRs2MetalDevice>>,
1007 ) -> BackendResult<()> {
1008 if device_id.starts_with("cuda:") {
1010 #[cfg(feature = "cuda")]
1011 {
1012 if let Some(cuda_device) = cuda_devices.get(device_id) {
1013 return Self::execute_cuda_block_move(cuda_device, block_move).await;
1014 }
1015 }
1016 } else if device_id.starts_with("metal:") {
1017 #[cfg(all(feature = "metal", target_os = "macos", target_arch = "aarch64"))]
1018 {
1019 if let Some(metal_device) = metal_devices.get(device_id) {
1020 return Self::execute_metal_block_move(metal_device, block_move).await;
1021 }
1022 }
1023 }
1024
1025 Self::execute_generic_block_move(memory_manager, block_move).await
1027 }
1028
1029 #[cfg(feature = "cuda")]
1031 #[allow(unused_unsafe)]
1032 async fn execute_cuda_block_move(
1033 _cuda_device: &SciRs2CudaDevice,
1034 _block_move: &BlockMove,
1035 ) -> BackendResult<()> {
1036 Err(BackendError::BackendError(
1039 "CUDA block move not yet implemented - requires scirs2_cuda memory operations"
1040 .to_string(),
1041 ))
1042 }
1043
1044 #[cfg(all(feature = "metal", target_os = "macos", target_arch = "aarch64"))]
1046 #[allow(unused_unsafe)]
1047 async fn execute_metal_block_move(
1048 metal_device: &SciRs2MetalDevice,
1049 block_move: &BlockMove,
1050 ) -> BackendResult<()> {
1051 unsafe {
1052 scirs2_metal::memory::copy_device_to_device(
1053 metal_device,
1054 block_move.from_address as *const u8,
1055 block_move.to_address as *mut u8,
1056 block_move.size,
1057 )
1058 .map_err(|e| BackendError::BackendError(format!("Metal block move failed: {}", e)))?;
1059 }
1060 Ok(())
1061 }
1062
1063 #[allow(dead_code)]
1065 async fn execute_generic_block_move(
1066 _memory_manager: &Arc<dyn MemoryManager>,
1067 block_move: &BlockMove,
1068 ) -> BackendResult<()> {
1069 unsafe {
1072 std::ptr::copy_nonoverlapping(
1073 block_move.from_address as *const u8,
1074 block_move.to_address as *mut u8,
1075 block_move.size,
1076 );
1077 }
1078 Ok(())
1079 }
1080
1081 #[allow(dead_code)]
1083 async fn execute_block_merge(
1084 block_merge: &BlockMerge,
1085 _memory_manager: &Arc<dyn MemoryManager>,
1086 ) -> BackendResult<()> {
1087 if block_merge.left_address + block_merge.left_size != block_merge.right_address {
1095 return Err(BackendError::InvalidArgument(
1096 "Blocks are not adjacent and cannot be merged".to_string(),
1097 ));
1098 }
1099
1100 Ok(())
1102 }
1103
1104 pub fn check_fragmentation_status(&self) -> HashMap<String, FragmentationInfo> {
1106 let mut status = HashMap::new();
1107
1108 for (device_id, memory_manager) in &self.memory_managers {
1109 let fragmentation_info = memory_manager.fragmentation_info();
1110 status.insert(device_id.clone(), fragmentation_info);
1111 }
1112
1113 status
1114 }
1115
1116 pub async fn auto_defragmentation_check(&self) {
1118 for (device_id, policy) in &self.policies {
1119 if !policy.enable_background {
1120 continue;
1121 }
1122
1123 let memory_manager = self
1124 .memory_managers
1125 .get(device_id)
1126 .expect("device_id should exist in memory_managers");
1127 let fragmentation_info = memory_manager.fragmentation_info();
1128
1129 if fragmentation_info.overall_fragmentation > policy.auto_trigger_threshold {
1131 let request = DefragmentationRequest {
1132 device_id: device_id.clone(),
1133 priority: DefragmentationPriority::Low,
1134 strategy: policy.strategy,
1135 force: false,
1136 };
1137
1138 let _ = self.task_queue.send(request);
1139 }
1140
1141 if fragmentation_info.overall_fragmentation > policy.emergency_threshold {
1143 let request = DefragmentationRequest {
1144 device_id: device_id.clone(),
1145 priority: DefragmentationPriority::Critical,
1146 strategy: DefragmentationStrategy::FullCompaction,
1147 force: true,
1148 };
1149
1150 let _ = self.task_queue.send(request);
1151 }
1152 }
1153 }
1154}
1155
1156impl Default for DefragmentationManager {
1157 fn default() -> Self {
1158 Self::new()
1159 }
1160}
1161
1162impl Drop for DefragmentationManager {
1163 fn drop(&mut self) {
1164 if let Some(handle) = self.background_handle.take() {
1165 #[cfg(feature = "async")]
1166 handle.abort();
1167
1168 #[cfg(not(feature = "async"))]
1169 {
1170 drop(handle);
1173 }
1174 }
1175 }
1176}
1177
1178pub mod utils {
1180 use super::*;
1181
1182 pub fn recommend_strategy(fragmentation_info: &FragmentationInfo) -> DefragmentationStrategy {
1184 match fragmentation_info.severity_level() {
1185 FragmentationSeverity::Low => DefragmentationStrategy::CoalesceOnly,
1186 FragmentationSeverity::Medium => {
1187 if fragmentation_info.free_blocks > 20 {
1188 DefragmentationStrategy::Incremental
1189 } else {
1190 DefragmentationStrategy::SmallBlocksOnly
1191 }
1192 }
1193 FragmentationSeverity::High => DefragmentationStrategy::LargeBlocksFirst,
1194 FragmentationSeverity::Critical => DefragmentationStrategy::FullCompaction,
1195 }
1196 }
1197
1198 pub fn optimal_policy_for_device(device_type: DeviceType) -> DefragmentationPolicy {
1200 match device_type {
1201 DeviceType::Cuda(_) => DefragmentationPolicy {
1202 auto_trigger_threshold: 0.5,
1203 min_interval_ms: 5_000,
1204 max_duration_ms: 2_000,
1205 strategy: DefragmentationStrategy::Incremental,
1206 enable_background: true,
1207 priority: DefragmentationPriority::Low,
1208 pause_allocations: false,
1209 emergency_threshold: 0.8,
1210 },
1211 DeviceType::Metal(_) => DefragmentationPolicy {
1212 auto_trigger_threshold: 0.6,
1213 min_interval_ms: 10_000,
1214 max_duration_ms: 3_000,
1215 strategy: DefragmentationStrategy::SmallBlocksOnly,
1216 enable_background: true,
1217 priority: DefragmentationPriority::Low,
1218 pause_allocations: false,
1219 emergency_threshold: 0.85,
1220 },
1221 DeviceType::Wgpu(_) => DefragmentationPolicy {
1222 auto_trigger_threshold: 0.7,
1223 min_interval_ms: 15_000,
1224 max_duration_ms: 5_000,
1225 strategy: DefragmentationStrategy::CoalesceOnly,
1226 enable_background: false, priority: DefragmentationPriority::Low,
1228 pause_allocations: true,
1229 emergency_threshold: 0.9,
1230 },
1231 DeviceType::Cpu => DefragmentationPolicy {
1232 auto_trigger_threshold: 0.4,
1233 min_interval_ms: 1_000,
1234 max_duration_ms: 1_000,
1235 strategy: DefragmentationStrategy::FullCompaction,
1236 enable_background: true,
1237 priority: DefragmentationPriority::Normal,
1238 pause_allocations: false,
1239 emergency_threshold: 0.7,
1240 },
1241 }
1242 }
1243
1244 pub fn calculate_memory_waste(fragmentation_info: &FragmentationInfo) -> usize {
1246 let _total_memory =
1247 fragmentation_info.total_free_memory + fragmentation_info.total_allocated_memory;
1248 let potential_free = fragmentation_info.largest_free_block;
1249 let actual_free = fragmentation_info.total_free_memory;
1250
1251 if actual_free > potential_free {
1252 actual_free - potential_free
1253 } else {
1254 0
1255 }
1256 }
1257
1258 pub fn estimate_compaction_benefit(
1260 fragmentation_info: &FragmentationInfo,
1261 strategy: DefragmentationStrategy,
1262 ) -> f32 {
1263 let base_benefit = match strategy {
1264 DefragmentationStrategy::FullCompaction => 0.8,
1265 DefragmentationStrategy::Incremental => 0.4,
1266 DefragmentationStrategy::SmallBlocksOnly => 0.3,
1267 DefragmentationStrategy::LargeBlocksFirst => 0.5,
1268 DefragmentationStrategy::CoalesceOnly => 0.2,
1269 DefragmentationStrategy::Generational => 0.3,
1270 };
1271
1272 let fragmentation_factor = fragmentation_info.overall_fragmentation;
1274 base_benefit * fragmentation_factor
1275 }
1276}
1277
1278#[cfg(test)]
1279mod tests {
1280 use super::*;
1281
1282 #[test]
1283 fn test_memory_block_creation() {
1284 let block = MemoryBlock::new(0x1000, 1024, true);
1285 assert_eq!(block.address, 0x1000);
1286 assert_eq!(block.size, 1024);
1287 assert!(block.allocated);
1288 assert_eq!(block.move_priority, 1);
1289 }
1290
1291 #[test]
1292 fn test_memory_block_adjacency() {
1293 let block1 = MemoryBlock::new(0x1000, 1024, false);
1294 let block2 = MemoryBlock::new(0x1400, 512, false); let block3 = MemoryBlock::new(0x2000, 256, false); assert!(block1.is_adjacent_to(&block2));
1298 assert!(!block1.is_adjacent_to(&block3));
1299 assert!(block1.can_merge_with(&block2));
1300 }
1301
1302 #[test]
1303 fn test_memory_block_move_cost() {
1304 let mut block = MemoryBlock::new(0x1000, 1024, true);
1305 block.access_frequency = 0.5;
1306 block.age = Duration::from_secs(3600); let cost = block.move_cost();
1309 assert!(cost > 0.0);
1310
1311 block.access_frequency = 1.0;
1313 let hot_cost = block.move_cost();
1314 assert!(hot_cost > cost);
1315 }
1316
1317 #[test]
1318 fn test_memory_block_hot_cold_classification() {
1319 let mut block = MemoryBlock::new(0x1000, 1024, true);
1320
1321 assert!(!block.is_hot());
1323 assert!(block.is_cold());
1324
1325 for _ in 0..10 {
1327 block.record_access();
1328 }
1329 assert!(block.is_hot());
1330 assert!(!block.is_cold());
1331 }
1332
1333 #[test]
1334 fn test_memory_layout_fragmentation_calculation() {
1335 let blocks = vec![
1336 MemoryBlock::new(0x1000, 1024, true), MemoryBlock::new(0x1400, 512, false), MemoryBlock::new(0x1600, 1024, true), MemoryBlock::new(0x1A00, 256, false), ];
1341
1342 let layout = MemoryLayout::new(blocks, 4096, 0x1000);
1343 let fragmentation = layout.calculate_fragmentation();
1344
1345 assert_eq!(fragmentation.free_blocks, 2);
1346 assert_eq!(fragmentation.allocated_blocks, 2);
1347 assert_eq!(fragmentation.largest_free_block, 512);
1348 assert_eq!(fragmentation.total_free_memory, 768);
1349 assert_eq!(fragmentation.total_allocated_memory, 2048);
1350 }
1351
1352 #[test]
1353 fn test_memory_layout_coalescable_blocks() {
1354 let blocks = vec![
1355 MemoryBlock::new(0x1000, 1024, false), MemoryBlock::new(0x1400, 512, false), MemoryBlock::new(0x1600, 1024, true), MemoryBlock::new(0x1A00, 256, false), ];
1360
1361 let layout = MemoryLayout::new(blocks, 4096, 0x1000);
1362 let coalescable = layout.find_coalescable_blocks();
1363
1364 assert_eq!(coalescable.len(), 1);
1365 assert_eq!(coalescable[0], (0, 1)); }
1367
1368 #[test]
1369 fn test_compaction_plan_creation() {
1370 let blocks = vec![
1371 MemoryBlock::new(0x1000, 1024, true),
1372 MemoryBlock::new(0x1400, 512, false),
1373 MemoryBlock::new(0x1600, 256, true),
1374 MemoryBlock::new(0x1700, 768, false),
1375 ];
1376
1377 let layout = MemoryLayout::new(blocks, 4096, 0x1000);
1378 let plan = layout.create_compaction_plan(DefragmentationStrategy::FullCompaction);
1379
1380 assert!(!plan.moves.is_empty() || !plan.merges.is_empty());
1381 assert!(plan.estimated_duration > Duration::from_nanos(0));
1382 }
1383
1384 #[test]
1385 fn test_compaction_plan_worthwhile() {
1386 let plan = CompactionPlan {
1387 moves: vec![BlockMove {
1388 from_address: 0x2000,
1389 to_address: 0x1000,
1390 size: 1024,
1391 block_index: 0,
1392 estimated_cost: 100.0,
1393 }],
1394 merges: Vec::new(),
1395 estimated_duration: Duration::from_millis(10),
1396 expected_fragmentation_improvement: 0.15,
1397 };
1398
1399 assert!(plan.is_worthwhile());
1400 assert_eq!(plan.total_bytes_to_move(), 1024);
1401 assert!(plan.performance_impact() > 0.0);
1402 }
1403
1404 #[test]
1405 fn test_defragmentation_manager_creation() {
1406 let manager = DefragmentationManager::new_for_test();
1407 assert!(manager.memory_managers.is_empty());
1408 assert!(manager.policies.is_empty());
1409
1410 let status = manager.get_status();
1411 assert!(status.is_empty());
1412
1413 let stats = manager.get_stats();
1414 assert_eq!(stats.total_operations, 0);
1415 }
1416
1417 #[test]
1418 fn test_defragmentation_stats() {
1419 let mut stats = DefragmentationStats::default();
1420 stats.total_operations = 100;
1421 stats.failed_operations = 5;
1422 stats.cancelled_operations = 3;
1423 stats.total_time = Duration::from_secs(50);
1424 stats.total_bytes_moved = 1024 * 1024 * 1024; assert_eq!(stats.success_rate(), 0.92); assert_eq!(stats.average_operation_time(), Duration::from_millis(500));
1428 assert!(stats.throughput() > 0.0);
1429 }
1430
1431 #[test]
1432 fn test_utils_recommend_strategy() {
1433 let low_fragmentation = FragmentationInfo {
1434 overall_fragmentation: 0.1,
1435 ..Default::default()
1436 };
1437 assert_eq!(
1438 utils::recommend_strategy(&low_fragmentation),
1439 DefragmentationStrategy::CoalesceOnly
1440 );
1441
1442 let high_fragmentation = FragmentationInfo {
1443 overall_fragmentation: 0.8,
1444 free_blocks: 5,
1445 allocated_blocks: 10,
1446 ..Default::default()
1447 };
1448 assert_eq!(
1449 utils::recommend_strategy(&high_fragmentation),
1450 DefragmentationStrategy::FullCompaction
1451 );
1452 }
1453
1454 #[test]
1455 fn test_utils_optimal_policy_for_device() {
1456 let cuda_policy = utils::optimal_policy_for_device(DeviceType::Cuda(0));
1457 assert_eq!(cuda_policy.auto_trigger_threshold, 0.5);
1458 assert!(cuda_policy.enable_background);
1459
1460 let webgpu_policy = utils::optimal_policy_for_device(DeviceType::Wgpu(0));
1461 assert_eq!(webgpu_policy.auto_trigger_threshold, 0.7);
1462 assert!(!webgpu_policy.enable_background);
1463 }
1464
1465 #[test]
1466 fn test_utils_calculate_memory_waste() {
1467 let fragmentation_info = FragmentationInfo {
1468 total_free_memory: 1000,
1469 total_allocated_memory: 2000,
1470 largest_free_block: 600,
1471 ..Default::default()
1472 };
1473
1474 let waste = utils::calculate_memory_waste(&fragmentation_info);
1475 assert_eq!(waste, 400); }
1477
1478 #[test]
1479 fn test_utils_estimate_compaction_benefit() {
1480 let fragmentation_info = FragmentationInfo {
1481 overall_fragmentation: 0.5,
1482 ..Default::default()
1483 };
1484
1485 let benefit = utils::estimate_compaction_benefit(
1486 &fragmentation_info,
1487 DefragmentationStrategy::FullCompaction,
1488 );
1489 assert_eq!(benefit, 0.4); let coalesce_benefit = utils::estimate_compaction_benefit(
1492 &fragmentation_info,
1493 DefragmentationStrategy::CoalesceOnly,
1494 );
1495 assert_eq!(coalesce_benefit, 0.1); }
1497}