1use std::collections::VecDeque;
8use std::sync::{Arc, Mutex};
9use std::time::{Duration, Instant};
10
11use super::prefetch::{PrefetchConfig, PrefetchStats};
12
13const DEFAULT_SAMPLING_INTERVAL: Duration = Duration::from_millis(500);
15
16const DEFAULT_MEMORY_PRESSURE_THRESHOLD: f64 = 0.85;
18
19const DEFAULT_CPU_LOAD_THRESHOLD: f64 = 0.85;
21
22const DEFAULT_IO_PRESSURE_THRESHOLD: f64 = 0.85;
24
25const MIN_ADJUSTMENT_INTERVAL: Duration = Duration::from_secs(1);
27
28const MAX_SNAPSHOT_HISTORY: usize = 20;
30
31#[derive(Debug, Clone, Copy, PartialEq, Eq)]
33pub enum ResourceType {
34 CPU,
36
37 Memory,
39
40 IO,
42
43 Combined,
45}
46
47#[derive(Debug, Clone)]
49pub struct ResourceSnapshot {
50 pub timestamp: Instant,
52
53 pub cpu_usage: f64,
55
56 pub memory_usage: u64,
58
59 pub memory_available: u64,
61
62 pub io_ops_per_sec: u64,
64
65 pub io_bytes_per_sec: u64,
67}
68
69impl ResourceSnapshot {
70 pub fn memory_pressure(&self) -> f64 {
72 if self.memory_available == 0 {
73 0.0 } else {
75 self.memory_usage as f64 / (self.memory_usage + self.memory_available) as f64
76 }
77 }
78
79 pub fn combined_pressure(&self) -> f64 {
81 const CPU_WEIGHT: f64 = 0.4;
83 const MEMORY_WEIGHT: f64 = 0.4;
84 const IO_WEIGHT: f64 = 0.2;
85
86 let io_pressure = if self.io_bytes_per_sec > 100_000_000 {
88 0.9
90 } else if self.io_bytes_per_sec > 50_000_000 {
91 0.7
93 } else if self.io_bytes_per_sec > 10_000_000 {
94 0.5
96 } else {
97 0.2
99 };
100
101 CPU_WEIGHT * self.cpu_usage
103 + MEMORY_WEIGHT * self.memory_pressure()
104 + IO_WEIGHT * io_pressure
105 }
106}
107
108#[derive(Debug, Clone)]
110pub struct ResourceAwareConfig {
111 pub sampling_interval: Duration,
113
114 pub memory_pressure_threshold: f64,
116
117 pub cpu_load_threshold: f64,
119
120 pub io_pressure_threshold: f64,
122
123 pub adjustment_interval: Duration,
125
126 pub auto_adjust: bool,
128
129 pub disable_under_pressure: bool,
131
132 pub min_prefetch_count: usize,
134
135 pub max_prefetch_count: usize,
137}
138
139impl Default for ResourceAwareConfig {
140 fn default() -> Self {
141 Self {
142 sampling_interval: DEFAULT_SAMPLING_INTERVAL,
143 memory_pressure_threshold: DEFAULT_MEMORY_PRESSURE_THRESHOLD,
144 cpu_load_threshold: DEFAULT_CPU_LOAD_THRESHOLD,
145 io_pressure_threshold: DEFAULT_IO_PRESSURE_THRESHOLD,
146 adjustment_interval: MIN_ADJUSTMENT_INTERVAL,
147 auto_adjust: true,
148 disable_under_pressure: true,
149 min_prefetch_count: 1,
150 max_prefetch_count: 8,
151 }
152 }
153}
154
155#[derive(Debug, Clone, Default)]
157pub struct ResourceAwareConfigBuilder {
158 config: ResourceAwareConfig,
159}
160
161impl ResourceAwareConfigBuilder {
162 pub fn new() -> Self {
164 Self::default()
165 }
166
167 pub const fn with_sampling_interval(mut self, interval: Duration) -> Self {
169 self.config.sampling_interval = interval;
170 self
171 }
172
173 pub fn with_memory_pressure_threshold(mut self, threshold: f64) -> Self {
175 self.config.memory_pressure_threshold = threshold.clamp(0.0, 1.0);
176 self
177 }
178
179 pub fn with_cpu_load_threshold(mut self, threshold: f64) -> Self {
181 self.config.cpu_load_threshold = threshold.clamp(0.0, 1.0);
182 self
183 }
184
185 pub fn with_io_pressure_threshold(mut self, threshold: f64) -> Self {
187 self.config.io_pressure_threshold = threshold.clamp(0.0, 1.0);
188 self
189 }
190
191 pub fn with_adjustment_interval(mut self, interval: Duration) -> Self {
193 self.config.adjustment_interval = std::cmp::max(interval, MIN_ADJUSTMENT_INTERVAL);
194 self
195 }
196
197 pub fn with_auto_adjust(mut self, autoadjust: bool) -> Self {
199 self.config.auto_adjust = autoadjust;
200 self
201 }
202
203 pub const fn with_disable_under_pressure(mut self, disable: bool) -> Self {
205 self.config.disable_under_pressure = disable;
206 self
207 }
208
209 pub const fn with_min_prefetch_count(mut self, count: usize) -> Self {
211 self.config.min_prefetch_count = count;
212 self
213 }
214
215 pub const fn with_max_prefetch_count(mut self, count: usize) -> Self {
217 self.config.max_prefetch_count = count;
218 self
219 }
220
221 pub fn build(self) -> ResourceAwareConfig {
223 self.config
224 }
225}
226
227pub struct ResourceMonitor {
229 config: ResourceAwareConfig,
231
232 snapshots: VecDeque<ResourceSnapshot>,
234
235 last_sample: Instant,
237
238 last_adjustment: Instant,
240
241 under_pressure: bool,
243
244 sys_info: Box<dyn SystemInfo + Send + Sync>,
246}
247
248impl ResourceMonitor {
249 pub fn new(config: ResourceAwareConfig) -> Self {
251 Self {
252 config,
253 snapshots: VecDeque::with_capacity(MAX_SNAPSHOT_HISTORY),
254 last_sample: Instant::now(),
255 last_adjustment: Instant::now(),
256 under_pressure: false,
257 sys_info: Box::new(DefaultSystemInfo),
258 }
259 }
260
261 pub fn take_snapshot(&mut self) -> ResourceSnapshot {
263 let cpu_usage = self.sys_info.get_cpu_usage();
265
266 let (memory_usage, memory_available) = self.sys_info.get_memoryinfo();
268
269 let (io_ops_per_sec, io_bytes_per_sec) = self.sys_info.get_io_stats();
271
272 let snapshot = ResourceSnapshot {
274 timestamp: Instant::now(),
275 cpu_usage,
276 memory_usage,
277 memory_available,
278 io_ops_per_sec,
279 io_bytes_per_sec,
280 };
281
282 self.snapshots.push_back(snapshot.clone());
284 while self.snapshots.len() > MAX_SNAPSHOT_HISTORY {
285 self.snapshots.pop_front();
286 }
287
288 self.last_sample = Instant::now();
290
291 snapshot
292 }
293
294 pub fn should_take_snapshot(&self) -> bool {
296 self.last_sample.elapsed() >= self.config.sampling_interval
297 }
298
299 pub fn is_under_pressure(&mut self) -> bool {
301 if self.should_take_snapshot() {
303 self.take_snapshot();
304 }
305
306 if self.snapshots.is_empty() {
308 return false;
309 }
310
311 let latest = self.snapshots.back().expect("Operation failed");
313
314 let cpu_pressure = latest.cpu_usage > self.config.cpu_load_threshold;
316 let memory_pressure = latest.memory_pressure() > self.config.memory_pressure_threshold;
317
318 let io_pressure = if latest.io_bytes_per_sec > 100_000_000 {
320 true
322 } else {
323 false
324 };
325
326 self.under_pressure = cpu_pressure || memory_pressure || io_pressure;
328
329 self.under_pressure
330 }
331
332 pub fn count(&mut self, base_prefetchcount: usize) -> usize {
334 if !self.config.auto_adjust {
335 return base_prefetchcount;
336 }
337
338 if self.should_take_snapshot() {
340 self.take_snapshot();
341 }
342
343 if self.snapshots.is_empty() {
345 return base_prefetchcount;
346 }
347
348 let latest = self.snapshots.back().expect("Operation failed");
350
351 let pressure = latest.combined_pressure();
353
354 if pressure > 0.90 && self.config.disable_under_pressure {
356 self.config.min_prefetch_count
358 } else if pressure > 0.75 {
359 std::cmp::max(
361 self.config.min_prefetch_count,
362 (base_prefetchcount as f64 * 0.5).round() as usize,
363 )
364 } else if pressure > 0.6 {
365 std::cmp::max(
367 self.config.min_prefetch_count,
368 (base_prefetchcount as f64 * 0.75).round() as usize,
369 )
370 } else if pressure < 0.3 {
371 std::cmp::min(
373 self.config.max_prefetch_count,
374 (base_prefetchcount as f64 * 1.5).round() as usize,
375 )
376 } else {
377 base_prefetchcount
379 }
380 }
381
382 pub fn adjust_prefetch_config(&mut self, config: &mut PrefetchConfig) -> bool {
384 if !self.config.auto_adjust
385 || self.last_adjustment.elapsed() < self.config.adjustment_interval
386 {
387 return false;
388 }
389
390 let optimal_prefetch_count = self.get_optimal_prefetch_count(config.prefetch_count);
392
393 if optimal_prefetch_count != config.prefetch_count {
395 config.prefetch_count = optimal_prefetch_count;
396 self.last_adjustment = Instant::now();
397 return true;
398 }
399
400 false
401 }
402
403 pub fn get_optimal_prefetch_count(&mut self, base_prefetchcount: usize) -> usize {
405 self.count(base_prefetchcount)
406 }
407
408 pub fn get_latest_snapshot(&self) -> Option<ResourceSnapshot> {
410 self.snapshots.back().cloned()
411 }
412
413 pub fn get_resource_summary(&self) -> ResourceSummary {
415 if self.snapshots.is_empty() {
416 return ResourceSummary::default();
417 }
418
419 let mut cpu_sum = 0.0;
421 let mut memory_pressure_sum = 0.0;
422 let mut io_bytes_sum = 0;
423
424 for snapshot in &self.snapshots {
425 cpu_sum += snapshot.cpu_usage;
426 memory_pressure_sum += snapshot.memory_pressure();
427 io_bytes_sum += snapshot.io_bytes_per_sec;
428 }
429
430 let count = self.snapshots.len();
431 let avg_cpu = cpu_sum / count as f64;
432 let avg_memory_pressure = memory_pressure_sum / count as f64;
433 let avg_io_bytes = io_bytes_sum / count as u64;
434
435 let trend_duration = if count >= 2 {
437 let oldest = &self.snapshots[0];
438 let newest = self.snapshots.back().expect("Operation failed");
439
440 newest.timestamp.duration_since(oldest.timestamp)
441 } else {
442 Duration::from_secs(0)
443 };
444
445 ResourceSummary {
446 avg_cpu_usage: avg_cpu,
447 avg_memory_pressure,
448 avg_io_bytes_per_sec: avg_io_bytes,
449 combined_pressure: self
450 .snapshots
451 .back()
452 .expect("Operation failed")
453 .combined_pressure(),
454 snapshot_count: count,
455 duration: trend_duration,
456 under_pressure: self.under_pressure,
457 }
458 }
459}
460
461#[derive(Debug, Clone)]
463pub struct ResourceSummary {
464 pub avg_cpu_usage: f64,
466
467 pub avg_memory_pressure: f64,
469
470 pub avg_io_bytes_per_sec: u64,
472
473 pub combined_pressure: f64,
475
476 pub snapshot_count: usize,
478
479 pub duration: Duration,
481
482 pub under_pressure: bool,
484}
485
486impl Default for ResourceSummary {
487 fn default() -> Self {
488 Self {
489 avg_cpu_usage: 0.0,
490 avg_memory_pressure: 0.0,
491 avg_io_bytes_per_sec: 0,
492 combined_pressure: 0.0,
493 snapshot_count: 0,
494 duration: Duration::from_secs(0),
495 under_pressure: false,
496 }
497 }
498}
499
500pub trait SystemInfo {
502 fn get_cpu_usage(&self) -> f64;
504
505 fn get_memoryinfo(&self) -> (u64, u64);
507
508 fn get_io_stats(&self) -> (u64, u64);
510}
511
512pub struct DefaultSystemInfo;
514
515impl SystemInfo for DefaultSystemInfo {
516 fn get_cpu_usage(&self) -> f64 {
517 #[cfg(feature = "sysinfo")]
519 {
520 use sysinfo::System;
521 let mut system = System::new_all();
522 system.refresh_cpu_all();
523
524 let cpu_usage: f64 = system
526 .cpus()
527 .iter()
528 .map(|cpu| cpu.cpu_usage() as f64 / 100.0)
529 .sum();
530 cpu_usage / system.cpus().len() as f64
531 }
532
533 #[cfg(not(feature = "sysinfo"))]
535 {
536 #[cfg(all(
538 target_family = "unix",
539 feature = "memory_compression",
540 feature = "cross_platform"
541 ))]
542 {
543 let mut loadavg = [0.0, 0.0, 0.0];
544 if unsafe { libc::getloadavg(loadavg.as_mut_ptr(), 3) } == 3 {
545 let num_cpus = num_cpus::get() as f64;
548 return (loadavg[0] / num_cpus).min(1.0);
549 }
550 }
551
552 0.5
554 }
555 }
556
557 fn get_memoryinfo(&self) -> (u64, u64) {
558 #[cfg(feature = "sysinfo")]
560 {
561 use sysinfo::System;
562 let mut system = System::new_all();
563 system.refresh_memory();
564
565 (
566 system.used_memory() * 1024,
567 system.available_memory() * 1024,
568 )
569 }
570
571 #[cfg(not(feature = "sysinfo"))]
573 {
574 #[cfg(feature = "sysinfo")]
576 {
577 if let Ok(mem) = sys_info::mem_info() {
578 let used = (mem.total - mem.free) * 1024;
579 let available = mem.free * 1024;
580 return (used, available);
581 }
582 }
583
584 (4 * 1024 * 1024 * 1024, 4 * 1024 * 1024 * 1024) }
588 }
589
590 fn get_io_stats(&self) -> (u64, u64) {
591 #[cfg(feature = "sysinfo")]
593 {
594 use sysinfo::{Disks, System};
595 let system = System::new_all();
596 let disks = Disks::new_with_refreshed_list();
597
598 let mut total_ops = 0;
600 let mut total_bytes = 0;
601
602 for disk in disks.list() {
603 total_ops += 1; total_bytes += disk.available_space();
608 }
609
610 (total_ops, total_bytes)
611 }
612
613 #[cfg(not(feature = "sysinfo"))]
615 {
616 (10, 1024 * 1024) }
618 }
619}
620
621pub struct ResourceAwarePrefetcher {
623 monitor: ResourceMonitor,
625
626 baseconfig: PrefetchConfig,
628
629 currentconfig: PrefetchConfig,
631
632 enabled: bool,
634
635 performance_stats: Arc<Mutex<PerformanceStats>>,
637
638 last_stats_update: Instant,
640}
641
642#[derive(Debug, Clone, Default)]
644pub struct PerformanceStats {
645 pub hit_rate: f64,
647
648 pub prefetch_latency_ns: f64,
650
651 pub non_prefetch_latency_ns: f64,
653
654 pub prefetch_count: usize,
656
657 pub access_count: usize,
659
660 pub resource_snapshots: Vec<(Instant, ResourceSummary)>,
662}
663
664impl ResourceAwarePrefetcher {
665 pub fn config(baseconfig: PrefetchConfig, resourceconfig: ResourceAwareConfig) -> Self {
667 Self {
668 monitor: ResourceMonitor::new(resourceconfig),
669 baseconfig: baseconfig.clone(),
670 currentconfig: baseconfig,
671 enabled: true,
672 performance_stats: Arc::new(Mutex::new(PerformanceStats::default())),
673 last_stats_update: Instant::now(),
674 }
675 }
676
677 pub fn update_config(&mut self) -> bool {
679 if !self.enabled {
680 return false;
681 }
682
683 let mut config = self.currentconfig.clone();
685 let changed = self.monitor.adjust_prefetch_config(&mut config);
686
687 if changed {
688 self.currentconfig = config;
689
690 if let Some(_snapshot) = self.monitor.get_latest_snapshot() {
692 let summary = self.monitor.get_resource_summary();
693 if let Ok(mut stats) = self.performance_stats.lock() {
694 stats.resource_snapshots.push((Instant::now(), summary));
695
696 while stats.resource_snapshots.len() > 10 {
698 stats.resource_snapshots.remove(0);
699 }
700 }
701 }
702 }
703
704 changed
705 }
706
707 pub fn record_prefetch_performance(
709 &mut self,
710 is_prefetched: bool,
711 latency_ns: f64,
712 prefetch_stats: &PrefetchStats,
713 ) {
714 if let Ok(mut stats) = self.performance_stats.lock() {
715 stats.hit_rate = prefetch_stats.hit_rate;
717 stats.prefetch_count = prefetch_stats.prefetch_count;
718 stats.access_count = prefetch_stats.prefetch_hits + prefetch_stats.prefetch_misses;
719
720 if is_prefetched {
722 if stats.prefetch_latency_ns == 0.0 {
724 stats.prefetch_latency_ns = latency_ns;
725 } else {
726 stats.prefetch_latency_ns = stats.prefetch_latency_ns * 0.9 + latency_ns * 0.1;
727 }
728 } else {
729 if stats.non_prefetch_latency_ns == 0.0 {
731 stats.non_prefetch_latency_ns = latency_ns;
732 } else {
733 stats.non_prefetch_latency_ns =
734 stats.non_prefetch_latency_ns * 0.9 + latency_ns * 0.1;
735 }
736 }
737 }
738
739 if self.last_stats_update.elapsed() >= Duration::from_secs(5) {
741 self.last_stats_update = Instant::now();
742
743 let summary = self.monitor.get_resource_summary();
745 if let Ok(mut stats) = self.performance_stats.lock() {
746 stats.resource_snapshots.push((Instant::now(), summary));
747
748 while stats.resource_snapshots.len() > 10 {
750 stats.resource_snapshots.remove(0);
751 }
752 }
753 }
754 }
755
756 pub fn get_currentconfig(&self) -> PrefetchConfig {
758 self.currentconfig.clone()
759 }
760
761 pub fn getbaseconfig(&self) -> PrefetchConfig {
763 self.baseconfig.clone()
764 }
765
766 pub fn take_resource_snapshot(&mut self) -> ResourceSnapshot {
768 self.monitor.take_snapshot()
769 }
770
771 pub fn get_resource_summary(&self) -> ResourceSummary {
773 self.monitor.get_resource_summary()
774 }
775
776 pub fn get_performance_stats(&self) -> PerformanceStats {
778 if let Ok(stats) = self.performance_stats.lock() {
779 stats.clone()
780 } else {
781 PerformanceStats::default()
782 }
783 }
784
785 pub fn is_under_pressure(&mut self) -> bool {
787 self.monitor.is_under_pressure()
788 }
789
790 pub fn set_enabled(&mut self, enabled: bool) {
792 self.enabled = enabled;
793 }
794
795 pub fn is_enabled(&self) -> bool {
797 self.enabled
798 }
799
800 pub fn get_optimal_prefetch_count(&mut self) -> usize {
802 self.monitor
803 .get_optimal_prefetch_count(self.baseconfig.prefetch_count)
804 }
805
806 pub fn reset_config(&mut self) {
808 self.currentconfig = self.baseconfig.clone();
809 }
810}
811
812#[derive(Debug, Clone)]
814#[allow(dead_code)]
815pub struct ResourceAwarePrefetchingConfig {
816 pub baseconfig: PrefetchConfig,
818
819 pub resourceconfig: ResourceAwareConfig,
821}
822
823#[allow(dead_code)]
824impl ResourceAwarePrefetchingConfig {
825 pub fn config(baseconfig: PrefetchConfig, resourceconfig: ResourceAwareConfig) -> Self {
827 Self {
828 baseconfig,
829 resourceconfig,
830 }
831 }
832
833 pub fn create_prefetcher(&self) -> ResourceAwarePrefetcher {
835 ResourceAwarePrefetcher::config(self.baseconfig.clone(), self.resourceconfig.clone())
836 }
837}
838
839#[cfg(test)]
840mod tests {
841 use super::*;
842
843 struct MockSystemInfo {
845 cpu_usage: f64,
846 memory_used: u64,
847 memory_available: u64,
848 io_ops: u64,
849 io_bytes: u64,
850 }
851
852 impl SystemInfo for MockSystemInfo {
853 fn get_cpu_usage(&self) -> f64 {
854 self.cpu_usage
855 }
856
857 fn get_memoryinfo(&self) -> (u64, u64) {
858 (self.memory_used, self.memory_available)
859 }
860
861 fn get_io_stats(&self) -> (u64, u64) {
862 (self.io_ops, self.io_bytes)
863 }
864 }
865
866 impl MockSystemInfo {
867 fn new(
868 cpu_usage: f64,
869 memory_used: u64,
870 memory_available: u64,
871 io_ops: u64,
872 io_bytes: u64,
873 ) -> Self {
874 Self {
875 cpu_usage,
876 memory_used,
877 memory_available,
878 io_ops,
879 io_bytes,
880 }
881 }
882
883 fn bytes(value: u64) -> Self {
884 Self {
885 cpu_usage: 0.0,
886 memory_used: value,
887 memory_available: 1024 * 1024 * 1024, io_ops: 0,
889 io_bytes: 0,
890 }
891 }
892 }
893
894 #[test]
895 fn test_resource_snapshot() {
896 let snapshot = ResourceSnapshot {
897 timestamp: Instant::now(),
898 cpu_usage: 0.7,
899 memory_usage: 8 * 1024 * 1024 * 1024, memory_available: 8 * 1024 * 1024 * 1024, io_ops_per_sec: 100,
902 io_bytes_per_sec: 10 * 1024 * 1024, };
904
905 assert_eq!(snapshot.memory_pressure(), 0.5); let combined = snapshot.combined_pressure();
910 assert!(combined > 0.0 && combined < 1.0);
911 }
912
913 #[test]
914 fn test_optimal_prefetch_count() {
915 let config = ResourceAwareConfig {
917 auto_adjust: true,
918 min_prefetch_count: 1,
919 max_prefetch_count: 10,
920 ..Default::default()
921 };
922
923 let mut monitor = ResourceMonitor::new(config);
924
925 monitor.sys_info = Box::new(MockSystemInfo::new(
927 0.2, 2 * 1024 * 1024 * 1024, 14 * 1024 * 1024 * 1024, 10, 1024 * 1024, ));
933
934 monitor.take_snapshot();
936
937 let base_count = 4;
939 let optimal = monitor.get_optimal_prefetch_count(base_count);
940 assert!(optimal >= base_count); monitor.sys_info = Box::new(MockSystemInfo::new(
944 0.9, 14 * 1024 * 1024 * 1024, 2 * 1024 * 1024 * 1024, 1000, 100 * 1024 * 1024, ));
950
951 monitor.take_snapshot();
953
954 let optimal = monitor.get_optimal_prefetch_count(base_count);
956 assert!(optimal <= base_count); }
958
959 #[test]
960 fn test_resource_aware_prefetcher() {
961 let baseconfig = PrefetchConfig {
963 prefetch_count: 5,
964 ..Default::default()
965 };
966
967 let resource_config = ResourceAwareConfig {
968 auto_adjust: true,
969 min_prefetch_count: 1,
970 max_prefetch_count: 10,
971 ..Default::default()
972 };
973
974 let mut prefetcher = ResourceAwarePrefetcher::config(baseconfig, resource_config);
975
976 let stats = PrefetchStats {
978 prefetch_count: 100,
979 prefetch_hits: 80,
980 prefetch_misses: 20,
981 hit_rate: 0.8,
982 };
983
984 prefetcher.record_prefetch_performance(true, 500_000.0, &stats); prefetcher.record_prefetch_performance(false, 2_000_000.0, &stats); let perf_stats = prefetcher.get_performance_stats();
989 assert_eq!(perf_stats.hit_rate, 0.8);
990 assert!(perf_stats.prefetch_latency_ns > 0.0);
991 assert!(perf_stats.non_prefetch_latency_ns > 0.0);
992 assert!(perf_stats.non_prefetch_latency_ns > perf_stats.prefetch_latency_ns);
993 }
995}