openraft/progress/
mod.rs

1//! Progress tracks replication state, i.e., it can be considered a map of node id to already
2//! replicated log id.
3//!
4//! A progress internally is a vector of scalar values.
5//! The scalar value is monotonically incremental. Decreasing it is not allowed.
6//! Optimization on calculating the committed log id is done on this assumption.
7
8#[cfg(feature = "bench")]
9#[cfg(test)]
10mod bench;
11pub(crate) mod entry;
12pub(crate) mod inflight;
13
14use std::borrow::Borrow;
15use std::fmt::Debug;
16use std::fmt::Display;
17use std::fmt::Formatter;
18use std::slice::Iter;
19use std::slice::IterMut;
20
21// TODO: remove it
22#[allow(unused_imports)]
23pub(crate) use inflight::Inflight;
24
25use crate::quorum::QuorumSet;
26
27/// Track progress of several incremental values.
28///
29/// When one of the value is updated, it uses a `QuorumSet` to calculate the committed value.
30/// `ID` is the identifier of every progress value.
31/// `V` is type of a progress entry.
32/// `P` is the progress data of `V`, a progress entry `V` could contain other user data.
33/// `QS` is a quorum set implementation.
34pub(crate) trait Progress<ID, V, P, QS>
35where
36    ID: 'static,
37    V: Borrow<P>,
38    QS: QuorumSet<ID>,
39{
40    /// Update one of the scalar value and re-calculate the committed value with provided function.
41    ///
42    /// It returns Err(committed) if the `id` is not found.
43    /// The provided function `f` update the value of `id`.
44    fn update_with<F>(&mut self, id: &ID, f: F) -> Result<&P, &P>
45    where F: FnOnce(&mut V);
46
47    /// Update one of the scalar value and re-calculate the committed value.
48    ///
49    /// It returns Err(committed) if the `id` is not found.
50    fn update(&mut self, id: &ID, value: V) -> Result<&P, &P> {
51        self.update_with(id, |x| *x = value)
52    }
53
54    /// Update the value if the new value is greater than the current value.
55    ///
56    /// It returns Err(committed) if the `id` is not found.
57    fn increase_to(&mut self, id: &ID, value: V) -> Result<&P, &P>
58    where V: PartialOrd {
59        self.update_with(id, |x| {
60            if value > *x {
61                *x = value;
62            }
63        })
64    }
65
66    /// Try to get the value by `id`.
67    #[allow(dead_code)]
68    fn try_get(&self, id: &ID) -> Option<&V>;
69
70    /// Returns a mutable reference to the value corresponding to the `id`.
71    fn get_mut(&mut self, id: &ID) -> Option<&mut V>;
72
73    // TODO: merge `get` and `try_get`
74    /// Get the value by `id`.
75    #[allow(dead_code)]
76    fn get(&self, id: &ID) -> &V;
77
78    /// Get the greatest value that is granted by a quorum defined in [`Self::quorum_set()`].
79    ///
80    /// In raft or other distributed consensus,
81    /// To commit a value, the value has to be **granted by a quorum** and has to be the greatest
82    /// value every proposed.
83    #[allow(dead_code)]
84    fn granted(&self) -> &P;
85
86    /// Returns the reference to the quorum set
87    #[allow(dead_code)]
88    fn quorum_set(&self) -> &QS;
89
90    /// Iterate over all id and values, voters first followed by learners.
91    fn iter(&self) -> Iter<(ID, V)>;
92
93    /// Build a new instance with the new quorum set, inheriting progress data from `self`.
94    fn upgrade_quorum_set(self, quorum_set: QS, learner_ids: &[ID], default_v: V) -> Self;
95
96    /// Return if the given id is a voter.
97    ///
98    /// A voter is a node in the quorum set that can grant a value.
99    /// A learner's progress is also tracked but it will never grant a value.
100    ///
101    /// If the given id is not in this `Progress`, it returns `None`.
102    fn is_voter(&self, id: &ID) -> Option<bool>;
103}
104
105/// A Progress implementation with vector as storage.
106///
107/// Suitable for small quorum set.
108#[derive(Clone, Debug)]
109#[derive(PartialEq, Eq)]
110pub(crate) struct VecProgress<ID, V, P, QS>
111where
112    ID: 'static,
113    QS: QuorumSet<ID>,
114{
115    /// Quorum set to determine if a set of `id` constitutes a quorum, i.e., committed.
116    quorum_set: QS,
117
118    /// Currently already committed value.
119    granted: P,
120
121    /// Number of voters
122    voter_count: usize,
123
124    /// Progress data.
125    ///
126    /// Elements with values greater than the `granted` are sorted in descending order.
127    /// Others are unsorted.
128    ///
129    /// The first `voter_count` elements are voters, the left are learners.
130    /// Learner elements are always still.
131    /// A voter element will be moved up to keep them in a descending order, when a new value is
132    /// updated.
133    vector: Vec<(ID, V)>,
134
135    /// Statistics of how it runs.
136    stat: Stat,
137}
138
139impl<ID, V, P, QS> Display for VecProgress<ID, V, P, QS>
140where
141    ID: PartialEq + Debug + Clone + 'static,
142    V: Clone + 'static,
143    V: Borrow<P>,
144    P: PartialOrd + Ord + Clone + 'static,
145    QS: QuorumSet<ID> + 'static,
146    ID: Display,
147    V: Display,
148{
149    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
150        write!(f, "{{")?;
151        for (i, (id, v)) in self.iter().enumerate() {
152            if i > 0 {
153                write!(f, ", ")?;
154            }
155            write!(f, "{}: {}", id, v)?
156        }
157        write!(f, "}}")?;
158
159        Ok(())
160    }
161}
162
163#[derive(Clone, Debug, Default)]
164#[derive(PartialEq, Eq)]
165pub(crate) struct Stat {
166    update_count: u64,
167    move_count: u64,
168    is_quorum_count: u64,
169}
170
171impl<ID, V, P, QS> VecProgress<ID, V, P, QS>
172where
173    ID: PartialEq + Clone + Debug + 'static,
174    V: Clone + 'static,
175    V: Borrow<P>,
176    P: PartialOrd + Ord + Clone + 'static,
177    QS: QuorumSet<ID>,
178{
179    pub(crate) fn new(quorum_set: QS, learner_ids: impl IntoIterator<Item = ID>, default_v: V) -> Self {
180        let mut vector = quorum_set.ids().map(|id| (id, default_v.clone())).collect::<Vec<_>>();
181
182        let voter_count = vector.len();
183
184        vector.extend(learner_ids.into_iter().map(|id| (id, default_v.clone())));
185
186        Self {
187            quorum_set,
188            granted: default_v.borrow().clone(),
189            voter_count,
190            vector,
191            stat: Default::default(),
192        }
193    }
194
195    /// Find the index in of the specified id.
196    #[inline(always)]
197    pub(crate) fn index(&self, target: &ID) -> Option<usize> {
198        for (i, elt) in self.vector.iter().enumerate() {
199            if elt.0 == *target {
200                return Some(i);
201            }
202        }
203
204        None
205    }
206
207    /// Move an element at `index` up so that all the values greater than `committed` are sorted.
208    #[inline(always)]
209    fn move_up(&mut self, index: usize) -> usize {
210        self.stat.move_count += 1;
211        for i in (0..index).rev() {
212            if self.vector[i].1.borrow() < self.vector[i + 1].1.borrow() {
213                self.vector.swap(i, i + 1);
214            } else {
215                return i + 1;
216            }
217        }
218
219        0
220    }
221
222    pub(crate) fn iter_mut(&mut self) -> IterMut<(ID, V)> {
223        self.vector.iter_mut()
224    }
225
226    #[allow(dead_code)]
227    pub(crate) fn stat(&self) -> &Stat {
228        &self.stat
229    }
230}
231
232impl<ID, V, P, QS> Progress<ID, V, P, QS> for VecProgress<ID, V, P, QS>
233where
234    ID: PartialEq + Debug + Clone + 'static,
235    V: Clone + 'static,
236    V: Borrow<P>,
237    P: PartialOrd + Ord + Clone + 'static,
238    QS: QuorumSet<ID> + 'static,
239{
240    /// Update one of the scalar value and re-calculate the committed value.
241    ///
242    /// Re-updating with a same V will do nothing.
243    ///
244    /// # Algorithm
245    ///
246    /// Only when the **previous value** is less or equal the committed,
247    /// and the **new value** is greater than the committed,
248    /// there is possibly an update to the committed.
249    ///
250    /// This way it gets rid of a portion of unnecessary re-calculation of committed,
251    /// and avoids unnecessary sorting: progresses are kept in order and only values greater than
252    /// committed need to sort.
253    ///
254    /// E.g., given 3 ids with value `1,3,5`, as shown in the figure below:
255    ///
256    /// ```text
257    /// a -----------+-------->
258    /// b -------+------------>
259    /// c ---+---------------->
260    /// ------------------------------
261    ///      1   3   5
262    /// ```
263    ///
264    /// the committed is `3` and assumes a majority quorum set is used.
265    /// Then:
266    /// - update(a, 6): nothing to do: committed is still 3;
267    /// - update(b, 4): re-calc:       committed becomes 4;
268    /// - update(b, 6): re-calc:       committed becomes 5;
269    /// - update(c, 2): nothing to do: committed is still 3;
270    /// - update(c, 3): nothing to do: committed is still 3;
271    /// - update(c, 4): re-calc:       committed becomes 4;
272    /// - update(c, 6): re-calc:       committed becomes 5;
273    fn update_with<F>(&mut self, id: &ID, f: F) -> Result<&P, &P>
274    where F: FnOnce(&mut V) {
275        self.stat.update_count += 1;
276
277        let index = match self.index(id) {
278            None => {
279                return Err(&self.granted);
280            }
281            Some(x) => x,
282        };
283
284        let elt = &mut self.vector[index];
285
286        let prev_progress = elt.1.borrow().clone();
287
288        f(&mut elt.1);
289
290        let new_progress = elt.1.borrow();
291
292        debug_assert!(new_progress >= &prev_progress,);
293
294        let prev_le_granted = prev_progress <= self.granted;
295        let new_gt_granted = new_progress > &self.granted;
296
297        if &prev_progress == new_progress {
298            return Ok(&self.granted);
299        }
300
301        // Learner does not grant a value.
302        // And it won't be moved up to adjust the order.
303        if index >= self.voter_count {
304            return Ok(&self.granted);
305        }
306
307        // Sort and find the greatest value granted by a quorum set.
308
309        if prev_le_granted && new_gt_granted {
310            let new_index = self.move_up(index);
311
312            // From high to low, find the max value that has constituted a quorum.
313            for i in new_index..self.voter_count {
314                let prog = self.vector[i].1.borrow();
315
316                // No need to re-calculate already committed value.
317                if prog <= &self.granted {
318                    break;
319                }
320
321                // Ids of the target that has value GE `vector[i]`
322                let it = self.vector[0..=i].iter().map(|x| &x.0);
323
324                self.stat.is_quorum_count += 1;
325
326                if self.quorum_set.is_quorum(it) {
327                    self.granted = prog.clone();
328                    break;
329                }
330            }
331        }
332
333        Ok(&self.granted)
334    }
335
336    #[allow(dead_code)]
337    fn try_get(&self, id: &ID) -> Option<&V> {
338        let index = self.index(id)?;
339        Some(&self.vector[index].1)
340    }
341
342    fn get_mut(&mut self, id: &ID) -> Option<&mut V> {
343        let index = self.index(id)?;
344        Some(&mut self.vector[index].1)
345    }
346
347    #[allow(dead_code)]
348    fn get(&self, id: &ID) -> &V {
349        let index = self.index(id).unwrap();
350        &self.vector[index].1
351    }
352
353    #[allow(dead_code)]
354    fn granted(&self) -> &P {
355        &self.granted
356    }
357
358    #[allow(dead_code)]
359    fn quorum_set(&self) -> &QS {
360        &self.quorum_set
361    }
362
363    fn iter(&self) -> Iter<(ID, V)> {
364        self.vector.as_slice().iter()
365    }
366
367    fn upgrade_quorum_set(self, quorum_set: QS, leaner_ids: &[ID], default_v: V) -> Self {
368        let mut new_prog = Self::new(quorum_set, leaner_ids.iter().cloned(), default_v);
369
370        new_prog.stat = self.stat.clone();
371
372        for (id, v) in self.iter() {
373            let _ = new_prog.update(id, v.clone());
374        }
375        new_prog
376    }
377
378    fn is_voter(&self, id: &ID) -> Option<bool> {
379        let index = self.index(id)?;
380        Some(index < self.voter_count)
381    }
382}
383
384#[cfg(test)]
385mod t {
386    use std::borrow::Borrow;
387
388    use super::Progress;
389    use super::VecProgress;
390    use crate::quorum::Joint;
391
392    #[test]
393    fn vec_progress_new() -> anyhow::Result<()> {
394        let quorum_set: Vec<u64> = vec![0, 1, 2, 3, 4];
395        let progress = VecProgress::<u64, u64, u64, _>::new(quorum_set, [6, 7], 0);
396
397        assert_eq!(
398            vec![
399                //
400                (0, 0),
401                (1, 0),
402                (2, 0),
403                (3, 0),
404                (4, 0),
405                (6, 0),
406                (7, 0),
407            ],
408            progress.vector
409        );
410        assert_eq!(5, progress.voter_count);
411
412        Ok(())
413    }
414
415    #[test]
416    fn vec_progress_get() -> anyhow::Result<()> {
417        let quorum_set: Vec<u64> = vec![0, 1, 2, 3, 4];
418        let mut progress = VecProgress::<u64, u64, u64, _>::new(quorum_set, [6, 7], 0);
419
420        let _ = progress.update(&6, 5);
421        assert_eq!(&5, progress.get(&6));
422        assert_eq!(Some(&5), progress.try_get(&6));
423        assert_eq!(None, progress.try_get(&9));
424
425        {
426            let x = progress.get_mut(&6);
427            if let Some(x) = x {
428                *x = 10;
429            }
430        }
431        assert_eq!(Some(&10), progress.try_get(&6));
432
433        Ok(())
434    }
435
436    #[test]
437    fn vec_progress_iter() -> anyhow::Result<()> {
438        let quorum_set: Vec<u64> = vec![0, 1, 2, 3, 4];
439        let mut progress = VecProgress::<u64, u64, u64, _>::new(quorum_set, [6, 7], 0);
440
441        let _ = progress.update(&7, 7);
442        let _ = progress.update(&3, 3);
443        let _ = progress.update(&1, 1);
444
445        assert_eq!(
446            vec![
447                //
448                (3, 3),
449                (1, 1),
450                (0, 0),
451                (2, 0),
452                (4, 0),
453                (6, 0),
454                (7, 7),
455            ],
456            progress.iter().copied().collect::<Vec<_>>(),
457            "iter() returns voter first, followed by learners"
458        );
459
460        Ok(())
461    }
462
463    #[test]
464    fn vec_progress_move_up() -> anyhow::Result<()> {
465        let quorum_set: Vec<u64> = vec![0, 1, 2, 3, 4];
466        let mut progress = VecProgress::<u64, u64, u64, _>::new(quorum_set, [6], 0);
467
468        // initial: 0-0, 1-0, 2-0, 3-0, 4-0
469        let cases = [
470            ((1, 2), &[(1, 2), (0, 0), (2, 0), (3, 0), (4, 0), (6, 0)], 0), //
471            ((2, 3), &[(2, 3), (1, 2), (0, 0), (3, 0), (4, 0), (6, 0)], 0), //
472            ((1, 3), &[(2, 3), (1, 3), (0, 0), (3, 0), (4, 0), (6, 0)], 1), // no move
473            ((4, 8), &[(4, 8), (2, 3), (1, 3), (0, 0), (3, 0), (6, 0)], 0), //
474            ((0, 5), &[(4, 8), (0, 5), (2, 3), (1, 3), (3, 0), (6, 0)], 1), // move to 1th
475        ];
476        for (ith, ((id, v), want_vec, want_new_index)) in cases.iter().enumerate() {
477            // Update a value and move it up to keep the order.
478            let index = progress.index(id).unwrap();
479            progress.vector[index].1 = *v;
480            let got = progress.move_up(index);
481
482            assert_eq!(
483                want_vec.as_slice(),
484                &progress.vector,
485                "{}-th case: idx:{}, v:{}",
486                ith,
487                *id,
488                *v
489            );
490            assert_eq!(*want_new_index, got, "{}-th case: idx:{}, v:{}", ith, *id, *v);
491        }
492        Ok(())
493    }
494
495    #[test]
496    fn vec_progress_update() -> anyhow::Result<()> {
497        let quorum_set: Vec<u64> = vec![0, 1, 2, 3, 4];
498        let mut progress = VecProgress::<u64, u64, u64, _>::new(quorum_set, [6], 0);
499
500        // initial: 0,0,0,0,0
501        let cases = vec![
502            ((6, 9), Ok(&0)),  // 0,0,0,0,0,9 // learner won't affect granted
503            ((1, 2), Ok(&0)),  // 0,2,0,0,0,0
504            ((2, 3), Ok(&0)),  // 0,2,3,0,0,0
505            ((3, 1), Ok(&1)),  // 0,2,3,1,0,0
506            ((4, 5), Ok(&2)),  // 0,2,3,1,5,0
507            ((0, 4), Ok(&3)),  // 4,2,3,1,5,0
508            ((3, 2), Ok(&3)),  // 4,2,3,2,5,0
509            ((3, 3), Ok(&3)),  // 4,2,3,2,5,0
510            ((1, 4), Ok(&4)),  // 4,4,3,2,5,0
511            ((9, 1), Err(&4)), // nonexistent id, ignore.
512        ];
513
514        // TODO: test update_with
515        for (ith, ((id, v), want_committed)) in cases.iter().enumerate() {
516            let got = progress.update_with(id, |x| *x = *v);
517            assert_eq!(want_committed.clone(), got, "{}-th case: id:{}, v:{}", ith, id, v);
518        }
519        Ok(())
520    }
521
522    /// Progress entry for testing
523    #[derive(Clone, Copy, Debug, PartialEq, Eq)]
524    struct ProgressEntry {
525        progress: u64,
526        user_data: &'static str,
527    }
528
529    impl Borrow<u64> for ProgressEntry {
530        fn borrow(&self) -> &u64 {
531            &self.progress
532        }
533    }
534
535    #[test]
536    fn vec_progress_update_struct_value() -> anyhow::Result<()> {
537        let pv = |p, user_data| ProgressEntry { progress: p, user_data };
538
539        let quorum_set: Vec<u64> = vec![0, 1, 2];
540        let mut progress = VecProgress::<u64, ProgressEntry, u64, _>::new(quorum_set, [3], pv(0, "foo"));
541
542        // initial: 0,0,0,0
543        let cases = [
544            (3, pv(9, "a"), Ok(&0)), // 0,0,0,9 // learner won't affect granted
545            (1, pv(2, "b"), Ok(&0)), // 0,2,0,9
546            (2, pv(3, "c"), Ok(&2)), // 0,2,3,9
547            (1, pv(2, "d"), Ok(&2)), // 0,2,3,9 // No new granted, just update user data.
548        ];
549
550        for (ith, (id, v, want_committed)) in cases.iter().enumerate() {
551            let got = progress.update(id, *v);
552            assert_eq!(want_committed.clone(), got, "{}-th case: id:{}, v:{:?}", ith, id, v);
553        }
554
555        // Check progress data
556
557        assert_eq!(pv(0, "foo"), *progress.get(&0),);
558        assert_eq!(pv(2, "d"), *progress.get(&1),);
559        assert_eq!(pv(3, "c"), *progress.get(&2),);
560        assert_eq!(pv(9, "a"), *progress.get(&3),);
561
562        Ok(())
563    }
564
565    #[test]
566    fn vec_progress_update_does_not_move_learner_elt() -> anyhow::Result<()> {
567        let quorum_set: Vec<u64> = vec![0, 1, 2, 3, 4];
568        let mut progress = VecProgress::<u64, u64, u64, _>::new(quorum_set, [6], 0);
569
570        assert_eq!(Some(5), progress.index(&6));
571
572        let _ = progress.update(&6, 6);
573        assert_eq!(Some(5), progress.index(&6), "learner is not moved");
574
575        let _ = progress.update(&4, 4);
576        assert_eq!(Some(0), progress.index(&4), "voter is not moved");
577        Ok(())
578    }
579
580    #[test]
581    fn vec_progress_upgrade_quorum_set() -> anyhow::Result<()> {
582        let qs012 = Joint::from(vec![vec![0, 1, 2]]);
583        let qs012_345 = Joint::from(vec![vec![0, 1, 2], vec![3, 4, 5]]);
584        let qs345 = Joint::from(vec![vec![3, 4, 5]]);
585
586        // Initially, committed is 5
587
588        let mut p012 = VecProgress::<u64, u64, u64, _>::new(qs012, [5], 0);
589
590        let _ = p012.update(&0, 5);
591        let _ = p012.update(&1, 6);
592        let _ = p012.update(&5, 9);
593        assert_eq!(&5, p012.granted());
594
595        // After upgrading to a bigger quorum set, committed fall back to 0
596
597        let mut p012_345 = p012.upgrade_quorum_set(qs012_345, &[6], 0);
598        assert_eq!(
599            &0,
600            p012_345.granted(),
601            "quorum extended from 012 to 012_345, committed falls back"
602        );
603        assert_eq!(&9, p012_345.get(&5), "inherit learner progress");
604
605        // When quorum set shrinks, committed becomes greater.
606
607        let _ = p012_345.update(&3, 7);
608        let _ = p012_345.update(&4, 8);
609        assert_eq!(&5, p012_345.granted());
610
611        let p345 = p012_345.upgrade_quorum_set(qs345, &[1], 0);
612
613        assert_eq!(&8, p345.granted(), "shrink quorum set, greater value becomes committed");
614        assert_eq!(&6, p345.get(&1), "inherit voter progress");
615
616        Ok(())
617    }
618
619    #[test]
620    fn vec_progress_is_voter() -> anyhow::Result<()> {
621        let quorum_set: Vec<u64> = vec![0, 1, 2, 3, 4];
622        let progress = VecProgress::<u64, u64, u64, _>::new(quorum_set, [6, 7], 0);
623
624        assert_eq!(Some(true), progress.is_voter(&1));
625        assert_eq!(Some(true), progress.is_voter(&3));
626        assert_eq!(Some(false), progress.is_voter(&7));
627        assert_eq!(None, progress.is_voter(&8));
628
629        Ok(())
630    }
631}