Skip to main content

stormdl_segment/
controller.rs

1use std::sync::atomic::{AtomicUsize, Ordering};
2use std::time::{Duration, Instant};
3
4pub struct AdaptiveController {
5    current_segments: AtomicUsize,
6    max_segments: usize,
7    min_segment_size: u64,
8    file_size: u64,
9    last_adjustment: parking_lot::Mutex<Instant>,
10    adjustment_interval: Duration,
11}
12
13impl AdaptiveController {
14    pub fn new(file_size: u64, initial_segments: usize) -> Self {
15        Self {
16            current_segments: AtomicUsize::new(initial_segments),
17            max_segments: 32,
18            min_segment_size: 256 * 1024,
19            file_size,
20            last_adjustment: parking_lot::Mutex::new(Instant::now()),
21            adjustment_interval: Duration::from_millis(500),
22        }
23    }
24
25    pub fn with_config(
26        file_size: u64,
27        initial_segments: usize,
28        max_segments: usize,
29        min_segment_size: u64,
30    ) -> Self {
31        Self {
32            current_segments: AtomicUsize::new(initial_segments),
33            max_segments,
34            min_segment_size,
35            file_size,
36            last_adjustment: parking_lot::Mutex::new(Instant::now()),
37            adjustment_interval: Duration::from_millis(500),
38        }
39    }
40
41    pub fn current_segments(&self) -> usize {
42        self.current_segments.load(Ordering::Relaxed)
43    }
44
45    pub fn evaluate(&self, bdp: Option<u64>, _current_speed: f64) -> Option<SegmentAdjustment> {
46        let mut last = self.last_adjustment.lock();
47        if last.elapsed() < self.adjustment_interval {
48            return None;
49        }
50
51        let current = self.current_segments.load(Ordering::Relaxed);
52        let bdp = bdp?;
53
54        let tcp_window = 65536u64;
55        let optimal = ((bdp as f64) / (tcp_window as f64)).ceil() as usize;
56        let optimal = optimal.clamp(1, self.max_segments);
57
58        if optimal <= current {
59            return None;
60        }
61
62        let remaining_segments = optimal - current;
63        let segments_to_add = remaining_segments.min(4);
64
65        let avg_segment_size = self.file_size / (current + segments_to_add) as u64;
66        if avg_segment_size < self.min_segment_size {
67            return None;
68        }
69
70        *last = Instant::now();
71        self.current_segments
72            .store(current + segments_to_add, Ordering::Relaxed);
73
74        Some(SegmentAdjustment::Split {
75            count: segments_to_add,
76            reason: AdjustmentReason::BdpIncrease { bdp, optimal },
77        })
78    }
79
80    pub fn should_split_slow_segment(
81        &self,
82        segment_speed: f64,
83        avg_speed: f64,
84        remaining_bytes: u64,
85    ) -> bool {
86        if remaining_bytes < self.min_segment_size * 2 {
87            return false;
88        }
89
90        let current = self.current_segments.load(Ordering::Relaxed);
91        if current >= self.max_segments {
92            return false;
93        }
94
95        let threshold = avg_speed * 0.3;
96        segment_speed > 0.0 && segment_speed < threshold
97    }
98
99    pub fn record_split(&self) {
100        self.current_segments.fetch_add(1, Ordering::Relaxed);
101    }
102}
103
104#[derive(Debug, Clone)]
105pub enum SegmentAdjustment {
106    Split {
107        count: usize,
108        reason: AdjustmentReason,
109    },
110    #[allow(dead_code)]
111    Merge {
112        count: usize,
113        reason: AdjustmentReason,
114    },
115}
116
117#[derive(Debug, Clone)]
118pub enum AdjustmentReason {
119    BdpIncrease {
120        bdp: u64,
121        optimal: usize,
122    },
123    #[allow(dead_code)]
124    SlowSegment {
125        speed: f64,
126        threshold: f64,
127    },
128    #[allow(dead_code)]
129    ConnectionLimit,
130}