oxigdal_streaming/
io_coalescing.rs1#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
12pub struct ByteRange {
13 pub start: u64,
15 pub end: u64,
17}
18
19impl ByteRange {
20 pub fn new(start: u64, end: u64) -> Self {
25 debug_assert!(end >= start, "end must be >= start");
26 Self { start, end }
27 }
28
29 #[must_use]
31 pub fn len(&self) -> u64 {
32 self.end.saturating_sub(self.start)
33 }
34
35 #[must_use]
37 pub fn is_empty(&self) -> bool {
38 self.start >= self.end
39 }
40
41 #[must_use]
43 pub fn overlaps_or_adjoins(&self, other: &ByteRange) -> bool {
44 self.start <= other.end && other.start <= self.end
45 }
46
47 #[must_use]
49 pub fn merge(&self, other: &ByteRange) -> ByteRange {
50 ByteRange {
51 start: self.start.min(other.start),
52 end: self.end.max(other.end),
53 }
54 }
55
56 #[must_use]
59 pub fn gap_to(&self, other: &ByteRange) -> u64 {
60 other.start.saturating_sub(self.end)
61 }
62}
63
64#[derive(Debug, Clone)]
66pub struct CoalescingConfig {
67 pub max_gap_bytes: u64,
69 pub max_merged_size: u64,
71 pub max_parallel_requests: usize,
73}
74
75impl Default for CoalescingConfig {
76 fn default() -> Self {
77 Self {
78 max_gap_bytes: 8 * 1024,
79 max_merged_size: 16 * 1024 * 1024,
80 max_parallel_requests: 8,
81 }
82 }
83}
84
85#[derive(Debug, Clone)]
87pub struct CoalescedRequest {
88 pub fetch_range: ByteRange,
90 pub sub_ranges: Vec<ByteRange>,
92}
93
94impl CoalescedRequest {
95 #[must_use]
101 pub fn extract<'a>(&self, merged_data: &'a [u8], sub_range: &ByteRange) -> Option<&'a [u8]> {
102 if sub_range.start < self.fetch_range.start || sub_range.end > self.fetch_range.end {
103 return None;
104 }
105 let offset = (sub_range.start - self.fetch_range.start) as usize;
106 let len = sub_range.len() as usize;
107 let end = offset + len;
108 if end <= merged_data.len() {
109 Some(&merged_data[offset..end])
110 } else {
111 None
112 }
113 }
114}
115
116pub fn coalesce_ranges(
122 mut ranges: Vec<ByteRange>,
123 config: &CoalescingConfig,
124) -> Vec<CoalescedRequest> {
125 if ranges.is_empty() {
126 return Vec::new();
127 }
128
129 ranges.sort();
130 ranges.dedup();
131
132 let mut result: Vec<CoalescedRequest> = Vec::new();
133 let mut current = CoalescedRequest {
134 fetch_range: ranges[0].clone(),
135 sub_ranges: vec![ranges[0].clone()],
136 };
137
138 for range in ranges.into_iter().skip(1) {
139 let gap = current.fetch_range.gap_to(&range);
140 let merged_size = range.end.saturating_sub(current.fetch_range.start);
141
142 if gap <= config.max_gap_bytes && merged_size <= config.max_merged_size {
143 current.fetch_range = current.fetch_range.merge(&range);
144 current.sub_ranges.push(range);
145 } else {
146 result.push(current);
147 current = CoalescedRequest {
148 fetch_range: range.clone(),
149 sub_ranges: vec![range],
150 };
151 }
152 }
153 result.push(current);
154 result
155}
156
157#[derive(Debug, Clone, Default)]
159pub struct CoalescingStats {
160 pub original_requests: usize,
162 pub coalesced_requests: usize,
164 pub bytes_fetched: u64,
166 pub bytes_needed: u64,
168 pub overhead_bytes: u64,
170}
171
172impl CoalescingStats {
173 #[must_use]
175 pub fn overhead_ratio(&self) -> f64 {
176 if self.bytes_needed == 0 {
177 0.0
178 } else {
179 self.overhead_bytes as f64 / self.bytes_needed as f64
180 }
181 }
182
183 #[must_use]
186 pub fn request_reduction(&self) -> f64 {
187 if self.original_requests == 0 {
188 0.0
189 } else {
190 1.0 - self.coalesced_requests as f64 / self.original_requests as f64
191 }
192 }
193}
194
195#[must_use]
197pub fn compute_stats(original: &[ByteRange], coalesced: &[CoalescedRequest]) -> CoalescingStats {
198 let bytes_needed: u64 = original.iter().map(ByteRange::len).sum();
199 let bytes_fetched: u64 = coalesced.iter().map(|c| c.fetch_range.len()).sum();
200 CoalescingStats {
201 original_requests: original.len(),
202 coalesced_requests: coalesced.len(),
203 bytes_fetched,
204 bytes_needed,
205 overhead_bytes: bytes_fetched.saturating_sub(bytes_needed),
206 }
207}