stormdl_segment/
controller.rs1use std::sync::atomic::{AtomicUsize, Ordering};
2use std::time::{Duration, Instant};
3
4pub struct AdaptiveController {
5 current_segments: AtomicUsize,
6 max_segments: usize,
7 min_segment_size: u64,
8 file_size: u64,
9 last_adjustment: parking_lot::Mutex<Instant>,
10 adjustment_interval: Duration,
11}
12
13impl AdaptiveController {
14 pub fn new(file_size: u64, initial_segments: usize) -> Self {
15 Self {
16 current_segments: AtomicUsize::new(initial_segments),
17 max_segments: 32,
18 min_segment_size: 256 * 1024,
19 file_size,
20 last_adjustment: parking_lot::Mutex::new(Instant::now()),
21 adjustment_interval: Duration::from_millis(500),
22 }
23 }
24
25 pub fn with_config(
26 file_size: u64,
27 initial_segments: usize,
28 max_segments: usize,
29 min_segment_size: u64,
30 ) -> Self {
31 Self {
32 current_segments: AtomicUsize::new(initial_segments),
33 max_segments,
34 min_segment_size,
35 file_size,
36 last_adjustment: parking_lot::Mutex::new(Instant::now()),
37 adjustment_interval: Duration::from_millis(500),
38 }
39 }
40
41 pub fn current_segments(&self) -> usize {
42 self.current_segments.load(Ordering::Relaxed)
43 }
44
45 pub fn evaluate(&self, bdp: Option<u64>, _current_speed: f64) -> Option<SegmentAdjustment> {
46 let mut last = self.last_adjustment.lock();
47 if last.elapsed() < self.adjustment_interval {
48 return None;
49 }
50
51 let current = self.current_segments.load(Ordering::Relaxed);
52 let bdp = bdp?;
53
54 let tcp_window = 65536u64;
55 let optimal = ((bdp as f64) / (tcp_window as f64)).ceil() as usize;
56 let optimal = optimal.clamp(1, self.max_segments);
57
58 if optimal <= current {
59 return None;
60 }
61
62 let remaining_segments = optimal - current;
63 let segments_to_add = remaining_segments.min(4);
64
65 let avg_segment_size = self.file_size / (current + segments_to_add) as u64;
66 if avg_segment_size < self.min_segment_size {
67 return None;
68 }
69
70 *last = Instant::now();
71 self.current_segments
72 .store(current + segments_to_add, Ordering::Relaxed);
73
74 Some(SegmentAdjustment::Split {
75 count: segments_to_add,
76 reason: AdjustmentReason::BdpIncrease { bdp, optimal },
77 })
78 }
79
80 pub fn should_split_slow_segment(
81 &self,
82 segment_speed: f64,
83 avg_speed: f64,
84 remaining_bytes: u64,
85 ) -> bool {
86 if remaining_bytes < self.min_segment_size * 2 {
87 return false;
88 }
89
90 let current = self.current_segments.load(Ordering::Relaxed);
91 if current >= self.max_segments {
92 return false;
93 }
94
95 let threshold = avg_speed * 0.3;
96 segment_speed > 0.0 && segment_speed < threshold
97 }
98
99 pub fn record_split(&self) {
100 self.current_segments.fetch_add(1, Ordering::Relaxed);
101 }
102}
103
104#[derive(Debug, Clone)]
105pub enum SegmentAdjustment {
106 Split {
107 count: usize,
108 reason: AdjustmentReason,
109 },
110 #[allow(dead_code)]
111 Merge {
112 count: usize,
113 reason: AdjustmentReason,
114 },
115}
116
117#[derive(Debug, Clone)]
118pub enum AdjustmentReason {
119 BdpIncrease {
120 bdp: u64,
121 optimal: usize,
122 },
123 #[allow(dead_code)]
124 SlowSegment {
125 speed: f64,
126 threshold: f64,
127 },
128 #[allow(dead_code)]
129 ConnectionLimit,
130}