Skip to main content

scepter/reliability/
replica.rs

1use std::collections::BTreeMap;
2use std::ops::Range;
3
4/// Operational state of a replica that may answer a query for a target range.
5#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
6pub enum ReplicaState {
7    /// Replica is available and not known to be recovering.
8    Available,
9    /// Replica can answer, but its data may still be catching up.
10    Recovering,
11    /// Replica should not be selected for query execution.
12    Unavailable,
13}
14
15/// Quality summary returned by a replica during replica resolution.
16#[derive(Debug, Clone, Copy, PartialEq)]
17pub struct ReplicaQuality {
18    /// Earliest timestamp covered by the replica.
19    pub start_time: u64,
20    /// Latest timestamp covered by the replica.
21    pub end_time: u64,
22    /// Number of points observed in the covered interval.
23    pub points: u64,
24    /// Expected points for a complete result in the covered interval.
25    pub expected_points: u64,
26    /// Whether the replica reports complete target coverage.
27    pub complete: bool,
28    /// Current operational state of the replica.
29    pub state: ReplicaState,
30}
31
32impl ReplicaQuality {
33    /// Creates a quality summary.
34    pub fn new(
35        start_time: u64,
36        end_time: u64,
37        points: u64,
38        expected_points: u64,
39        complete: bool,
40        state: ReplicaState,
41    ) -> Self {
42        Self {
43            start_time,
44            end_time,
45            points,
46            expected_points,
47            complete,
48            state,
49        }
50    }
51
52    /// Returns the covered time span.
53    pub fn coverage_span(self) -> u64 {
54        self.end_time.saturating_sub(self.start_time)
55    }
56
57    /// Returns observed point density as `points / expected_points`.
58    pub fn density(self) -> f64 {
59        if self.expected_points == 0 {
60            return 1.0;
61        }
62        (self.points as f64 / self.expected_points as f64).min(1.0)
63    }
64
65    /// Returns true if this replica can be selected.
66    pub fn selectable(self) -> bool {
67        self.state != ReplicaState::Unavailable
68    }
69
70    fn rank(self) -> ReplicaRank {
71        ReplicaRank {
72            selectable: self.selectable(),
73            complete: self.complete,
74            state: self.state,
75            density: self.density(),
76            coverage_span: self.coverage_span(),
77            end_time: self.end_time,
78            points: self.points,
79        }
80    }
81}
82
83/// Candidate replica for one target range.
84#[derive(Debug, Clone, PartialEq)]
85pub struct ReplicaCandidate<ReplicaId> {
86    /// Target range covered by this candidate.
87    pub range: Range<Vec<u8>>,
88    /// Replica identifier.
89    pub replica: ReplicaId,
90    /// Replica quality summary.
91    pub quality: ReplicaQuality,
92}
93
94impl<ReplicaId> ReplicaCandidate<ReplicaId> {
95    /// Creates a replica candidate.
96    pub fn new(range: Range<Vec<u8>>, replica: ReplicaId, quality: ReplicaQuality) -> Self {
97        Self {
98            range,
99            replica,
100            quality,
101        }
102    }
103}
104
105/// Selected primary and fallback replicas for one target range.
106#[derive(Debug, Clone, PartialEq)]
107pub struct ResolvedRange<ReplicaId> {
108    /// Target range being resolved.
109    pub range: Range<Vec<u8>>,
110    /// Primary replica selected for query execution.
111    pub primary: ReplicaId,
112    /// Equivalent fallback replicas ordered by quality.
113    pub fallbacks: Vec<ReplicaId>,
114    /// Quality of the selected primary.
115    pub quality: ReplicaQuality,
116}
117
118/// Selects the best replica per range and records ordered fallbacks.
119#[derive(Debug, Clone, Copy, PartialEq, Eq)]
120pub struct ReplicaResolver {
121    max_fallbacks: usize,
122}
123
124impl Default for ReplicaResolver {
125    fn default() -> Self {
126        Self { max_fallbacks: 2 }
127    }
128}
129
130impl ReplicaResolver {
131    /// Creates a resolver that keeps up to two fallbacks per range.
132    pub fn new() -> Self {
133        Self::default()
134    }
135
136    /// Creates a resolver with an explicit fallback limit.
137    pub fn with_max_fallbacks(max_fallbacks: usize) -> Self {
138        Self { max_fallbacks }
139    }
140
141    /// Returns the fallback limit.
142    pub fn max_fallbacks(self) -> usize {
143        self.max_fallbacks
144    }
145
146    /// Resolves candidate replicas into one primary per target range.
147    pub fn resolve<ReplicaId>(
148        self,
149        candidates: impl IntoIterator<Item = ReplicaCandidate<ReplicaId>>,
150    ) -> Vec<ResolvedRange<ReplicaId>> {
151        let mut by_range = BTreeMap::<RangeKey, Vec<ReplicaCandidate<ReplicaId>>>::new();
152        for candidate in candidates {
153            by_range
154                .entry(RangeKey::from(candidate.range.clone()))
155                .or_default()
156                .push(candidate);
157        }
158
159        let mut resolved = Vec::new();
160        for (_, mut candidates) in by_range {
161            candidates.retain(|candidate| candidate.quality.selectable());
162            if candidates.is_empty() {
163                continue;
164            }
165
166            candidates.sort_by(|left, right| compare_quality(right.quality, left.quality));
167            let primary = candidates.remove(0);
168            let fallbacks = candidates
169                .into_iter()
170                .take(self.max_fallbacks)
171                .map(|candidate| candidate.replica)
172                .collect();
173
174            resolved.push(ResolvedRange {
175                range: primary.range,
176                primary: primary.replica,
177                fallbacks,
178                quality: primary.quality,
179            });
180        }
181        resolved
182    }
183}
184
185#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
186struct RangeKey {
187    start: Vec<u8>,
188    end: Vec<u8>,
189}
190
191impl From<Range<Vec<u8>>> for RangeKey {
192    fn from(range: Range<Vec<u8>>) -> Self {
193        Self {
194            start: range.start,
195            end: range.end,
196        }
197    }
198}
199
200#[derive(Debug, Clone, Copy, PartialEq)]
201struct ReplicaRank {
202    selectable: bool,
203    complete: bool,
204    state: ReplicaState,
205    density: f64,
206    coverage_span: u64,
207    end_time: u64,
208    points: u64,
209}
210
211fn compare_quality(left: ReplicaQuality, right: ReplicaQuality) -> std::cmp::Ordering {
212    let left = left.rank();
213    let right = right.rank();
214    left.selectable
215        .cmp(&right.selectable)
216        .then_with(|| left.complete.cmp(&right.complete))
217        .then_with(|| state_score(left.state).cmp(&state_score(right.state)))
218        .then_with(|| left.density.total_cmp(&right.density))
219        .then_with(|| left.coverage_span.cmp(&right.coverage_span))
220        .then_with(|| left.end_time.cmp(&right.end_time))
221        .then_with(|| left.points.cmp(&right.points))
222}
223
224fn state_score(state: ReplicaState) -> u8 {
225    match state {
226        ReplicaState::Available => 2,
227        ReplicaState::Recovering => 1,
228        ReplicaState::Unavailable => 0,
229    }
230}
231
232#[cfg(test)]
233mod tests {
234    use super::*;
235
236    fn quality(
237        points: u64,
238        expected_points: u64,
239        complete: bool,
240        state: ReplicaState,
241    ) -> ReplicaQuality {
242        ReplicaQuality::new(10, 40, points, expected_points, complete, state)
243    }
244
245    #[test]
246    fn resolver_selects_best_replica_and_orders_fallbacks() {
247        let candidates = vec![
248            ReplicaCandidate::new(
249                b"a".to_vec()..b"m".to_vec(),
250                "recovering",
251                quality(100, 100, true, ReplicaState::Recovering),
252            ),
253            ReplicaCandidate::new(
254                b"a".to_vec()..b"m".to_vec(),
255                "primary",
256                quality(95, 100, true, ReplicaState::Available),
257            ),
258            ReplicaCandidate::new(
259                b"a".to_vec()..b"m".to_vec(),
260                "fallback",
261                quality(80, 100, true, ReplicaState::Available),
262            ),
263        ];
264
265        let resolved = ReplicaResolver::with_max_fallbacks(1).resolve(candidates);
266
267        assert_eq!(resolved.len(), 1);
268        assert_eq!(resolved[0].primary, "primary");
269        assert_eq!(resolved[0].fallbacks, vec!["fallback"]);
270    }
271
272    #[test]
273    fn resolver_skips_unavailable_replicas() {
274        let candidates = vec![
275            ReplicaCandidate::new(
276                b"a".to_vec()..b"m".to_vec(),
277                "down",
278                quality(100, 100, true, ReplicaState::Unavailable),
279            ),
280            ReplicaCandidate::new(
281                b"m".to_vec()..b"z".to_vec(),
282                "up",
283                quality(50, 100, false, ReplicaState::Available),
284            ),
285        ];
286
287        let resolved = ReplicaResolver::new().resolve(candidates);
288
289        assert_eq!(resolved.len(), 1);
290        assert_eq!(resolved[0].range, b"m".to_vec()..b"z".to_vec());
291        assert_eq!(resolved[0].primary, "up");
292    }
293
294    #[test]
295    fn quality_reports_density_and_coverage() {
296        let quality = ReplicaQuality::new(10, 42, 25, 50, false, ReplicaState::Available);
297
298        assert_eq!(quality.coverage_span(), 32);
299        assert_eq!(quality.density(), 0.5);
300        assert!(quality.selectable());
301    }
302}