1use std::collections::BTreeMap;
2use std::ops::Range;
3
4#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
6pub enum ReplicaState {
7 Available,
9 Recovering,
11 Unavailable,
13}
14
15#[derive(Debug, Clone, Copy, PartialEq)]
17pub struct ReplicaQuality {
18 pub start_time: u64,
20 pub end_time: u64,
22 pub points: u64,
24 pub expected_points: u64,
26 pub complete: bool,
28 pub state: ReplicaState,
30}
31
32impl ReplicaQuality {
33 pub fn new(
35 start_time: u64,
36 end_time: u64,
37 points: u64,
38 expected_points: u64,
39 complete: bool,
40 state: ReplicaState,
41 ) -> Self {
42 Self {
43 start_time,
44 end_time,
45 points,
46 expected_points,
47 complete,
48 state,
49 }
50 }
51
52 pub fn coverage_span(self) -> u64 {
54 self.end_time.saturating_sub(self.start_time)
55 }
56
57 pub fn density(self) -> f64 {
59 if self.expected_points == 0 {
60 return 1.0;
61 }
62 (self.points as f64 / self.expected_points as f64).min(1.0)
63 }
64
65 pub fn selectable(self) -> bool {
67 self.state != ReplicaState::Unavailable
68 }
69
70 fn rank(self) -> ReplicaRank {
71 ReplicaRank {
72 selectable: self.selectable(),
73 complete: self.complete,
74 state: self.state,
75 density: self.density(),
76 coverage_span: self.coverage_span(),
77 end_time: self.end_time,
78 points: self.points,
79 }
80 }
81}
82
83#[derive(Debug, Clone, PartialEq)]
85pub struct ReplicaCandidate<ReplicaId> {
86 pub range: Range<Vec<u8>>,
88 pub replica: ReplicaId,
90 pub quality: ReplicaQuality,
92}
93
94impl<ReplicaId> ReplicaCandidate<ReplicaId> {
95 pub fn new(range: Range<Vec<u8>>, replica: ReplicaId, quality: ReplicaQuality) -> Self {
97 Self {
98 range,
99 replica,
100 quality,
101 }
102 }
103}
104
105#[derive(Debug, Clone, PartialEq)]
107pub struct ResolvedRange<ReplicaId> {
108 pub range: Range<Vec<u8>>,
110 pub primary: ReplicaId,
112 pub fallbacks: Vec<ReplicaId>,
114 pub quality: ReplicaQuality,
116}
117
118#[derive(Debug, Clone, Copy, PartialEq, Eq)]
120pub struct ReplicaResolver {
121 max_fallbacks: usize,
122}
123
124impl Default for ReplicaResolver {
125 fn default() -> Self {
126 Self { max_fallbacks: 2 }
127 }
128}
129
130impl ReplicaResolver {
131 pub fn new() -> Self {
133 Self::default()
134 }
135
136 pub fn with_max_fallbacks(max_fallbacks: usize) -> Self {
138 Self { max_fallbacks }
139 }
140
141 pub fn max_fallbacks(self) -> usize {
143 self.max_fallbacks
144 }
145
146 pub fn resolve<ReplicaId>(
148 self,
149 candidates: impl IntoIterator<Item = ReplicaCandidate<ReplicaId>>,
150 ) -> Vec<ResolvedRange<ReplicaId>> {
151 let mut by_range = BTreeMap::<RangeKey, Vec<ReplicaCandidate<ReplicaId>>>::new();
152 for candidate in candidates {
153 by_range
154 .entry(RangeKey::from(candidate.range.clone()))
155 .or_default()
156 .push(candidate);
157 }
158
159 let mut resolved = Vec::new();
160 for (_, mut candidates) in by_range {
161 candidates.retain(|candidate| candidate.quality.selectable());
162 if candidates.is_empty() {
163 continue;
164 }
165
166 candidates.sort_by(|left, right| compare_quality(right.quality, left.quality));
167 let primary = candidates.remove(0);
168 let fallbacks = candidates
169 .into_iter()
170 .take(self.max_fallbacks)
171 .map(|candidate| candidate.replica)
172 .collect();
173
174 resolved.push(ResolvedRange {
175 range: primary.range,
176 primary: primary.replica,
177 fallbacks,
178 quality: primary.quality,
179 });
180 }
181 resolved
182 }
183}
184
185#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
186struct RangeKey {
187 start: Vec<u8>,
188 end: Vec<u8>,
189}
190
191impl From<Range<Vec<u8>>> for RangeKey {
192 fn from(range: Range<Vec<u8>>) -> Self {
193 Self {
194 start: range.start,
195 end: range.end,
196 }
197 }
198}
199
200#[derive(Debug, Clone, Copy, PartialEq)]
201struct ReplicaRank {
202 selectable: bool,
203 complete: bool,
204 state: ReplicaState,
205 density: f64,
206 coverage_span: u64,
207 end_time: u64,
208 points: u64,
209}
210
211fn compare_quality(left: ReplicaQuality, right: ReplicaQuality) -> std::cmp::Ordering {
212 let left = left.rank();
213 let right = right.rank();
214 left.selectable
215 .cmp(&right.selectable)
216 .then_with(|| left.complete.cmp(&right.complete))
217 .then_with(|| state_score(left.state).cmp(&state_score(right.state)))
218 .then_with(|| left.density.total_cmp(&right.density))
219 .then_with(|| left.coverage_span.cmp(&right.coverage_span))
220 .then_with(|| left.end_time.cmp(&right.end_time))
221 .then_with(|| left.points.cmp(&right.points))
222}
223
224fn state_score(state: ReplicaState) -> u8 {
225 match state {
226 ReplicaState::Available => 2,
227 ReplicaState::Recovering => 1,
228 ReplicaState::Unavailable => 0,
229 }
230}
231
232#[cfg(test)]
233mod tests {
234 use super::*;
235
236 fn quality(
237 points: u64,
238 expected_points: u64,
239 complete: bool,
240 state: ReplicaState,
241 ) -> ReplicaQuality {
242 ReplicaQuality::new(10, 40, points, expected_points, complete, state)
243 }
244
245 #[test]
246 fn resolver_selects_best_replica_and_orders_fallbacks() {
247 let candidates = vec![
248 ReplicaCandidate::new(
249 b"a".to_vec()..b"m".to_vec(),
250 "recovering",
251 quality(100, 100, true, ReplicaState::Recovering),
252 ),
253 ReplicaCandidate::new(
254 b"a".to_vec()..b"m".to_vec(),
255 "primary",
256 quality(95, 100, true, ReplicaState::Available),
257 ),
258 ReplicaCandidate::new(
259 b"a".to_vec()..b"m".to_vec(),
260 "fallback",
261 quality(80, 100, true, ReplicaState::Available),
262 ),
263 ];
264
265 let resolved = ReplicaResolver::with_max_fallbacks(1).resolve(candidates);
266
267 assert_eq!(resolved.len(), 1);
268 assert_eq!(resolved[0].primary, "primary");
269 assert_eq!(resolved[0].fallbacks, vec!["fallback"]);
270 }
271
272 #[test]
273 fn resolver_skips_unavailable_replicas() {
274 let candidates = vec![
275 ReplicaCandidate::new(
276 b"a".to_vec()..b"m".to_vec(),
277 "down",
278 quality(100, 100, true, ReplicaState::Unavailable),
279 ),
280 ReplicaCandidate::new(
281 b"m".to_vec()..b"z".to_vec(),
282 "up",
283 quality(50, 100, false, ReplicaState::Available),
284 ),
285 ];
286
287 let resolved = ReplicaResolver::new().resolve(candidates);
288
289 assert_eq!(resolved.len(), 1);
290 assert_eq!(resolved[0].range, b"m".to_vec()..b"z".to_vec());
291 assert_eq!(resolved[0].primary, "up");
292 }
293
294 #[test]
295 fn quality_reports_density_and_coverage() {
296 let quality = ReplicaQuality::new(10, 42, 25, 50, false, ReplicaState::Available);
297
298 assert_eq!(quality.coverage_span(), 32);
299 assert_eq!(quality.density(), 0.5);
300 assert!(quality.selectable());
301 }
302}