Skip to main content

stormdl_segment/
rebalancer.rs

1use crate::SegmentManager;
2use std::sync::Arc;
3use stormdl_core::SegmentStatus;
4
5pub struct Rebalancer {
6    manager: Arc<SegmentManager>,
7    slow_threshold_pct: f64,
8    min_segment_size: u64,
9    max_segments: usize,
10}
11
12impl Rebalancer {
13    pub fn new(manager: Arc<SegmentManager>) -> Self {
14        Self {
15            manager,
16            slow_threshold_pct: 0.2,
17            min_segment_size: 256 * 1024,
18            max_segments: 32,
19        }
20    }
21
22    pub fn with_threshold(manager: Arc<SegmentManager>, slow_threshold_pct: f64) -> Self {
23        Self {
24            manager,
25            slow_threshold_pct,
26            min_segment_size: 256 * 1024,
27            max_segments: 32,
28        }
29    }
30
31    pub fn with_config(
32        manager: Arc<SegmentManager>,
33        slow_threshold_pct: f64,
34        min_segment_size: u64,
35        max_segments: usize,
36    ) -> Self {
37        Self {
38            manager,
39            slow_threshold_pct,
40            min_segment_size,
41            max_segments,
42        }
43    }
44
45    pub fn check_and_rebalance(&self) -> Vec<usize> {
46        self.check_and_rebalance_with_bdp(None)
47    }
48
49    pub fn check_and_rebalance_with_bdp(&self, bdp: Option<u64>) -> Vec<usize> {
50        let segments = self.manager.get_segments();
51        let avg_speed = self.manager.average_speed();
52
53        if avg_speed <= 0.0 {
54            return vec![];
55        }
56
57        let current_count = segments.len();
58        if current_count >= self.max_segments {
59            return vec![];
60        }
61
62        let slow_threshold = self.calculate_threshold(avg_speed, bdp);
63        let mut new_segments = Vec::new();
64
65        for segment in &segments {
66            if segment.status != SegmentStatus::Active {
67                continue;
68            }
69
70            let remaining = segment.remaining();
71            if remaining < self.min_segment_size * 2 {
72                continue;
73            }
74
75            if segment.speed < slow_threshold && remaining > 0 {
76                if current_count + new_segments.len() >= self.max_segments {
77                    break;
78                }
79
80                if let Some(new_seg) = self.manager.split_segment(segment.id) {
81                    tracing::info!(
82                        "Split slow segment {} (speed: {:.2} KB/s, avg: {:.2} KB/s, threshold: {:.2} KB/s) -> new segment {}",
83                        segment.id,
84                        segment.speed / 1024.0,
85                        avg_speed / 1024.0,
86                        slow_threshold / 1024.0,
87                        new_seg.id
88                    );
89                    new_segments.push(new_seg.id);
90                }
91            }
92        }
93
94        new_segments
95    }
96
97    fn calculate_threshold(&self, avg_speed: f64, bdp: Option<u64>) -> f64 {
98        let base_threshold = avg_speed * self.slow_threshold_pct;
99
100        if let Some(bdp) = bdp {
101            let tcp_window = 65536.0;
102            let bdp_factor = (bdp as f64 / tcp_window).sqrt();
103            let adjusted_pct = (self.slow_threshold_pct * bdp_factor).min(0.5);
104            avg_speed * adjusted_pct
105        } else {
106            base_threshold
107        }
108    }
109
110    pub fn optimal_segments_from_bdp(&self, bdp: u64, file_size: u64) -> usize {
111        let tcp_window = 65536u64;
112        let min_connections = ((bdp as f64) / (tcp_window as f64)).ceil() as usize;
113
114        let size_based_max = match file_size {
115            0..=1_000_000 => 1,
116            1_000_001..=10_000_000 => 4,
117            10_000_001..=100_000_000 => 8,
118            100_000_001..=1_000_000_000 => 16,
119            _ => 32,
120        };
121
122        min_connections.clamp(1, size_based_max.min(self.max_segments))
123    }
124}