stormdl_segment/
rebalancer.rs1use crate::SegmentManager;
2use std::sync::Arc;
3use stormdl_core::SegmentStatus;
4
5pub struct Rebalancer {
6 manager: Arc<SegmentManager>,
7 slow_threshold_pct: f64,
8 min_segment_size: u64,
9 max_segments: usize,
10}
11
12impl Rebalancer {
13 pub fn new(manager: Arc<SegmentManager>) -> Self {
14 Self {
15 manager,
16 slow_threshold_pct: 0.2,
17 min_segment_size: 256 * 1024,
18 max_segments: 32,
19 }
20 }
21
22 pub fn with_threshold(manager: Arc<SegmentManager>, slow_threshold_pct: f64) -> Self {
23 Self {
24 manager,
25 slow_threshold_pct,
26 min_segment_size: 256 * 1024,
27 max_segments: 32,
28 }
29 }
30
31 pub fn with_config(
32 manager: Arc<SegmentManager>,
33 slow_threshold_pct: f64,
34 min_segment_size: u64,
35 max_segments: usize,
36 ) -> Self {
37 Self {
38 manager,
39 slow_threshold_pct,
40 min_segment_size,
41 max_segments,
42 }
43 }
44
45 pub fn check_and_rebalance(&self) -> Vec<usize> {
46 self.check_and_rebalance_with_bdp(None)
47 }
48
49 pub fn check_and_rebalance_with_bdp(&self, bdp: Option<u64>) -> Vec<usize> {
50 let segments = self.manager.get_segments();
51 let avg_speed = self.manager.average_speed();
52
53 if avg_speed <= 0.0 {
54 return vec![];
55 }
56
57 let current_count = segments.len();
58 if current_count >= self.max_segments {
59 return vec![];
60 }
61
62 let slow_threshold = self.calculate_threshold(avg_speed, bdp);
63 let mut new_segments = Vec::new();
64
65 for segment in &segments {
66 if segment.status != SegmentStatus::Active {
67 continue;
68 }
69
70 let remaining = segment.remaining();
71 if remaining < self.min_segment_size * 2 {
72 continue;
73 }
74
75 if segment.speed < slow_threshold && remaining > 0 {
76 if current_count + new_segments.len() >= self.max_segments {
77 break;
78 }
79
80 if let Some(new_seg) = self.manager.split_segment(segment.id) {
81 tracing::info!(
82 "Split slow segment {} (speed: {:.2} KB/s, avg: {:.2} KB/s, threshold: {:.2} KB/s) -> new segment {}",
83 segment.id,
84 segment.speed / 1024.0,
85 avg_speed / 1024.0,
86 slow_threshold / 1024.0,
87 new_seg.id
88 );
89 new_segments.push(new_seg.id);
90 }
91 }
92 }
93
94 new_segments
95 }
96
97 fn calculate_threshold(&self, avg_speed: f64, bdp: Option<u64>) -> f64 {
98 let base_threshold = avg_speed * self.slow_threshold_pct;
99
100 if let Some(bdp) = bdp {
101 let tcp_window = 65536.0;
102 let bdp_factor = (bdp as f64 / tcp_window).sqrt();
103 let adjusted_pct = (self.slow_threshold_pct * bdp_factor).min(0.5);
104 avg_speed * adjusted_pct
105 } else {
106 base_threshold
107 }
108 }
109
110 pub fn optimal_segments_from_bdp(&self, bdp: u64, file_size: u64) -> usize {
111 let tcp_window = 65536u64;
112 let min_connections = ((bdp as f64) / (tcp_window as f64)).ceil() as usize;
113
114 let size_based_max = match file_size {
115 0..=1_000_000 => 1,
116 1_000_001..=10_000_000 => 4,
117 10_000_001..=100_000_000 => 8,
118 100_000_001..=1_000_000_000 => 16,
119 _ => 32,
120 };
121
122 min_connections.clamp(1, size_based_max.min(self.max_segments))
123 }
124}