Skip to main content

oxigdal_streaming/
io_coalescing.rs

1//! Parallel I/O coalescing for cloud object storage reads.
2//!
3//! When reading multiple byte ranges from the same object (e.g., COG tiles),
4//! it is more efficient to:
5//! 1. Merge nearby ranges into a single larger request (coalescing)
6//! 2. Issue remaining ranges in parallel
7//!
8//! This avoids the overhead of many small HTTP range requests.
9
10/// A byte range `[start, end)` (exclusive end).
11#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
12pub struct ByteRange {
13    /// Start offset (inclusive).
14    pub start: u64,
15    /// End offset (exclusive).
16    pub end: u64,
17}
18
19impl ByteRange {
20    /// Creates a new `ByteRange`.
21    ///
22    /// # Panics
23    /// Panics in debug mode if `end < start`.
24    pub fn new(start: u64, end: u64) -> Self {
25        debug_assert!(end >= start, "end must be >= start");
26        Self { start, end }
27    }
28
29    /// Returns the length of this range in bytes.
30    #[must_use]
31    pub fn len(&self) -> u64 {
32        self.end.saturating_sub(self.start)
33    }
34
35    /// Returns `true` if this range covers zero bytes.
36    #[must_use]
37    pub fn is_empty(&self) -> bool {
38        self.start >= self.end
39    }
40
41    /// Returns `true` if this range overlaps with or directly adjoins `other`.
42    #[must_use]
43    pub fn overlaps_or_adjoins(&self, other: &ByteRange) -> bool {
44        self.start <= other.end && other.start <= self.end
45    }
46
47    /// Returns the smallest range that contains both `self` and `other`.
48    #[must_use]
49    pub fn merge(&self, other: &ByteRange) -> ByteRange {
50        ByteRange {
51            start: self.start.min(other.start),
52            end: self.end.max(other.end),
53        }
54    }
55
56    /// Returns the gap in bytes between `self.end` and `other.start`.
57    /// Returns `0` if the ranges overlap or adjoin.
58    #[must_use]
59    pub fn gap_to(&self, other: &ByteRange) -> u64 {
60        other.start.saturating_sub(self.end)
61    }
62}
63
64/// Configuration for the I/O coalescing algorithm.
65#[derive(Debug, Clone)]
66pub struct CoalescingConfig {
67    /// Merge two adjacent ranges when the gap between them is smaller than this threshold.
68    pub max_gap_bytes: u64,
69    /// Do not create a merged range larger than this limit.
70    pub max_merged_size: u64,
71    /// Maximum number of parallel fetch requests to issue.
72    pub max_parallel_requests: usize,
73}
74
75impl Default for CoalescingConfig {
76    fn default() -> Self {
77        Self {
78            max_gap_bytes: 8 * 1024,
79            max_merged_size: 16 * 1024 * 1024,
80            max_parallel_requests: 8,
81        }
82    }
83}
84
85/// A single coalesced (merged) fetch request with the original sub-ranges it covers.
86#[derive(Debug, Clone)]
87pub struct CoalescedRequest {
88    /// The merged byte range that should actually be fetched.
89    pub fetch_range: ByteRange,
90    /// The original ranges that are covered by `fetch_range`.
91    pub sub_ranges: Vec<ByteRange>,
92}
93
94impl CoalescedRequest {
95    /// Extracts the slice corresponding to `sub_range` from a buffer that contains
96    /// the full contents of `fetch_range`.
97    ///
98    /// Returns `None` if `sub_range` is not fully contained within `fetch_range`
99    /// or if the buffer is too short.
100    #[must_use]
101    pub fn extract<'a>(&self, merged_data: &'a [u8], sub_range: &ByteRange) -> Option<&'a [u8]> {
102        if sub_range.start < self.fetch_range.start || sub_range.end > self.fetch_range.end {
103            return None;
104        }
105        let offset = (sub_range.start - self.fetch_range.start) as usize;
106        let len = sub_range.len() as usize;
107        let end = offset + len;
108        if end <= merged_data.len() {
109            Some(&merged_data[offset..end])
110        } else {
111            None
112        }
113    }
114}
115
116/// Coalesces a list of byte ranges according to `config`.
117///
118/// Ranges whose gap is smaller than `config.max_gap_bytes` are merged into a
119/// single `CoalescedRequest`, provided the merged size does not exceed
120/// `config.max_merged_size`.
121pub fn coalesce_ranges(
122    mut ranges: Vec<ByteRange>,
123    config: &CoalescingConfig,
124) -> Vec<CoalescedRequest> {
125    if ranges.is_empty() {
126        return Vec::new();
127    }
128
129    ranges.sort();
130    ranges.dedup();
131
132    let mut result: Vec<CoalescedRequest> = Vec::new();
133    let mut current = CoalescedRequest {
134        fetch_range: ranges[0].clone(),
135        sub_ranges: vec![ranges[0].clone()],
136    };
137
138    for range in ranges.into_iter().skip(1) {
139        let gap = current.fetch_range.gap_to(&range);
140        let merged_size = range.end.saturating_sub(current.fetch_range.start);
141
142        if gap <= config.max_gap_bytes && merged_size <= config.max_merged_size {
143            current.fetch_range = current.fetch_range.merge(&range);
144            current.sub_ranges.push(range);
145        } else {
146            result.push(current);
147            current = CoalescedRequest {
148                fetch_range: range.clone(),
149                sub_ranges: vec![range],
150            };
151        }
152    }
153    result.push(current);
154    result
155}
156
157/// Statistics describing the efficiency of a coalescing operation.
158#[derive(Debug, Clone, Default)]
159pub struct CoalescingStats {
160    /// Number of original (pre-coalescing) range requests.
161    pub original_requests: usize,
162    /// Number of coalesced (post-coalescing) fetch requests.
163    pub coalesced_requests: usize,
164    /// Total bytes that will be fetched (including gap fill-in).
165    pub bytes_fetched: u64,
166    /// Total bytes actually needed (sum of original range lengths).
167    pub bytes_needed: u64,
168    /// Bytes fetched that are not part of any original range.
169    pub overhead_bytes: u64,
170}
171
172impl CoalescingStats {
173    /// Fraction of fetched bytes that are overhead (`0.0` = no overhead).
174    #[must_use]
175    pub fn overhead_ratio(&self) -> f64 {
176        if self.bytes_needed == 0 {
177            0.0
178        } else {
179            self.overhead_bytes as f64 / self.bytes_needed as f64
180        }
181    }
182
183    /// Fraction by which the request count was reduced (`0.0` = no reduction,
184    /// `1.0` = all merged into one).
185    #[must_use]
186    pub fn request_reduction(&self) -> f64 {
187        if self.original_requests == 0 {
188            0.0
189        } else {
190            1.0 - self.coalesced_requests as f64 / self.original_requests as f64
191        }
192    }
193}
194
195/// Computes [`CoalescingStats`] for the given original ranges and coalesced output.
196#[must_use]
197pub fn compute_stats(original: &[ByteRange], coalesced: &[CoalescedRequest]) -> CoalescingStats {
198    let bytes_needed: u64 = original.iter().map(ByteRange::len).sum();
199    let bytes_fetched: u64 = coalesced.iter().map(|c| c.fetch_range.len()).sum();
200    CoalescingStats {
201        original_requests: original.len(),
202        coalesced_requests: coalesced.len(),
203        bytes_fetched,
204        bytes_needed,
205        overhead_bytes: bytes_fetched.saturating_sub(bytes_needed),
206    }
207}