1#[cfg(feature = "bench")]
9#[cfg(test)]
10mod bench;
11pub(crate) mod entry;
12pub(crate) mod inflight;
13
14use std::borrow::Borrow;
15use std::fmt::Debug;
16use std::fmt::Display;
17use std::fmt::Formatter;
18use std::slice::Iter;
19use std::slice::IterMut;
20
21#[allow(unused_imports)]
23pub(crate) use inflight::Inflight;
24
25use crate::quorum::QuorumSet;
26
27pub(crate) trait Progress<ID, V, P, QS>
35where
36 ID: 'static,
37 V: Borrow<P>,
38 QS: QuorumSet<ID>,
39{
40 fn update_with<F>(&mut self, id: &ID, f: F) -> Result<&P, &P>
45 where F: FnOnce(&mut V);
46
47 fn update(&mut self, id: &ID, value: V) -> Result<&P, &P> {
51 self.update_with(id, |x| *x = value)
52 }
53
54 fn increase_to(&mut self, id: &ID, value: V) -> Result<&P, &P>
58 where V: PartialOrd {
59 self.update_with(id, |x| {
60 if value > *x {
61 *x = value;
62 }
63 })
64 }
65
66 #[allow(dead_code)]
68 fn try_get(&self, id: &ID) -> Option<&V>;
69
70 fn get_mut(&mut self, id: &ID) -> Option<&mut V>;
72
73 #[allow(dead_code)]
76 fn get(&self, id: &ID) -> &V;
77
78 #[allow(dead_code)]
84 fn granted(&self) -> &P;
85
86 #[allow(dead_code)]
88 fn quorum_set(&self) -> &QS;
89
90 fn iter(&self) -> Iter<(ID, V)>;
92
93 fn upgrade_quorum_set(self, quorum_set: QS, learner_ids: &[ID], default_v: V) -> Self;
95
96 fn is_voter(&self, id: &ID) -> Option<bool>;
103}
104
105#[derive(Clone, Debug)]
109#[derive(PartialEq, Eq)]
110pub(crate) struct VecProgress<ID, V, P, QS>
111where
112 ID: 'static,
113 QS: QuorumSet<ID>,
114{
115 quorum_set: QS,
117
118 granted: P,
120
121 voter_count: usize,
123
124 vector: Vec<(ID, V)>,
134
135 stat: Stat,
137}
138
139impl<ID, V, P, QS> Display for VecProgress<ID, V, P, QS>
140where
141 ID: PartialEq + Debug + Clone + 'static,
142 V: Clone + 'static,
143 V: Borrow<P>,
144 P: PartialOrd + Ord + Clone + 'static,
145 QS: QuorumSet<ID> + 'static,
146 ID: Display,
147 V: Display,
148{
149 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
150 write!(f, "{{")?;
151 for (i, (id, v)) in self.iter().enumerate() {
152 if i > 0 {
153 write!(f, ", ")?;
154 }
155 write!(f, "{}: {}", id, v)?
156 }
157 write!(f, "}}")?;
158
159 Ok(())
160 }
161}
162
163#[derive(Clone, Debug, Default)]
164#[derive(PartialEq, Eq)]
165pub(crate) struct Stat {
166 update_count: u64,
167 move_count: u64,
168 is_quorum_count: u64,
169}
170
171impl<ID, V, P, QS> VecProgress<ID, V, P, QS>
172where
173 ID: PartialEq + Clone + Debug + 'static,
174 V: Clone + 'static,
175 V: Borrow<P>,
176 P: PartialOrd + Ord + Clone + 'static,
177 QS: QuorumSet<ID>,
178{
179 pub(crate) fn new(quorum_set: QS, learner_ids: impl IntoIterator<Item = ID>, default_v: V) -> Self {
180 let mut vector = quorum_set.ids().map(|id| (id, default_v.clone())).collect::<Vec<_>>();
181
182 let voter_count = vector.len();
183
184 vector.extend(learner_ids.into_iter().map(|id| (id, default_v.clone())));
185
186 Self {
187 quorum_set,
188 granted: default_v.borrow().clone(),
189 voter_count,
190 vector,
191 stat: Default::default(),
192 }
193 }
194
195 #[inline(always)]
197 pub(crate) fn index(&self, target: &ID) -> Option<usize> {
198 for (i, elt) in self.vector.iter().enumerate() {
199 if elt.0 == *target {
200 return Some(i);
201 }
202 }
203
204 None
205 }
206
207 #[inline(always)]
209 fn move_up(&mut self, index: usize) -> usize {
210 self.stat.move_count += 1;
211 for i in (0..index).rev() {
212 if self.vector[i].1.borrow() < self.vector[i + 1].1.borrow() {
213 self.vector.swap(i, i + 1);
214 } else {
215 return i + 1;
216 }
217 }
218
219 0
220 }
221
222 pub(crate) fn iter_mut(&mut self) -> IterMut<(ID, V)> {
223 self.vector.iter_mut()
224 }
225
226 #[allow(dead_code)]
227 pub(crate) fn stat(&self) -> &Stat {
228 &self.stat
229 }
230}
231
232impl<ID, V, P, QS> Progress<ID, V, P, QS> for VecProgress<ID, V, P, QS>
233where
234 ID: PartialEq + Debug + Clone + 'static,
235 V: Clone + 'static,
236 V: Borrow<P>,
237 P: PartialOrd + Ord + Clone + 'static,
238 QS: QuorumSet<ID> + 'static,
239{
240 fn update_with<F>(&mut self, id: &ID, f: F) -> Result<&P, &P>
274 where F: FnOnce(&mut V) {
275 self.stat.update_count += 1;
276
277 let index = match self.index(id) {
278 None => {
279 return Err(&self.granted);
280 }
281 Some(x) => x,
282 };
283
284 let elt = &mut self.vector[index];
285
286 let prev_progress = elt.1.borrow().clone();
287
288 f(&mut elt.1);
289
290 let new_progress = elt.1.borrow();
291
292 debug_assert!(new_progress >= &prev_progress,);
293
294 let prev_le_granted = prev_progress <= self.granted;
295 let new_gt_granted = new_progress > &self.granted;
296
297 if &prev_progress == new_progress {
298 return Ok(&self.granted);
299 }
300
301 if index >= self.voter_count {
304 return Ok(&self.granted);
305 }
306
307 if prev_le_granted && new_gt_granted {
310 let new_index = self.move_up(index);
311
312 for i in new_index..self.voter_count {
314 let prog = self.vector[i].1.borrow();
315
316 if prog <= &self.granted {
318 break;
319 }
320
321 let it = self.vector[0..=i].iter().map(|x| &x.0);
323
324 self.stat.is_quorum_count += 1;
325
326 if self.quorum_set.is_quorum(it) {
327 self.granted = prog.clone();
328 break;
329 }
330 }
331 }
332
333 Ok(&self.granted)
334 }
335
336 #[allow(dead_code)]
337 fn try_get(&self, id: &ID) -> Option<&V> {
338 let index = self.index(id)?;
339 Some(&self.vector[index].1)
340 }
341
342 fn get_mut(&mut self, id: &ID) -> Option<&mut V> {
343 let index = self.index(id)?;
344 Some(&mut self.vector[index].1)
345 }
346
347 #[allow(dead_code)]
348 fn get(&self, id: &ID) -> &V {
349 let index = self.index(id).unwrap();
350 &self.vector[index].1
351 }
352
353 #[allow(dead_code)]
354 fn granted(&self) -> &P {
355 &self.granted
356 }
357
358 #[allow(dead_code)]
359 fn quorum_set(&self) -> &QS {
360 &self.quorum_set
361 }
362
363 fn iter(&self) -> Iter<(ID, V)> {
364 self.vector.as_slice().iter()
365 }
366
367 fn upgrade_quorum_set(self, quorum_set: QS, leaner_ids: &[ID], default_v: V) -> Self {
368 let mut new_prog = Self::new(quorum_set, leaner_ids.iter().cloned(), default_v);
369
370 new_prog.stat = self.stat.clone();
371
372 for (id, v) in self.iter() {
373 let _ = new_prog.update(id, v.clone());
374 }
375 new_prog
376 }
377
378 fn is_voter(&self, id: &ID) -> Option<bool> {
379 let index = self.index(id)?;
380 Some(index < self.voter_count)
381 }
382}
383
384#[cfg(test)]
385mod t {
386 use std::borrow::Borrow;
387
388 use super::Progress;
389 use super::VecProgress;
390 use crate::quorum::Joint;
391
392 #[test]
393 fn vec_progress_new() -> anyhow::Result<()> {
394 let quorum_set: Vec<u64> = vec![0, 1, 2, 3, 4];
395 let progress = VecProgress::<u64, u64, u64, _>::new(quorum_set, [6, 7], 0);
396
397 assert_eq!(
398 vec![
399 (0, 0),
401 (1, 0),
402 (2, 0),
403 (3, 0),
404 (4, 0),
405 (6, 0),
406 (7, 0),
407 ],
408 progress.vector
409 );
410 assert_eq!(5, progress.voter_count);
411
412 Ok(())
413 }
414
415 #[test]
416 fn vec_progress_get() -> anyhow::Result<()> {
417 let quorum_set: Vec<u64> = vec![0, 1, 2, 3, 4];
418 let mut progress = VecProgress::<u64, u64, u64, _>::new(quorum_set, [6, 7], 0);
419
420 let _ = progress.update(&6, 5);
421 assert_eq!(&5, progress.get(&6));
422 assert_eq!(Some(&5), progress.try_get(&6));
423 assert_eq!(None, progress.try_get(&9));
424
425 {
426 let x = progress.get_mut(&6);
427 if let Some(x) = x {
428 *x = 10;
429 }
430 }
431 assert_eq!(Some(&10), progress.try_get(&6));
432
433 Ok(())
434 }
435
436 #[test]
437 fn vec_progress_iter() -> anyhow::Result<()> {
438 let quorum_set: Vec<u64> = vec![0, 1, 2, 3, 4];
439 let mut progress = VecProgress::<u64, u64, u64, _>::new(quorum_set, [6, 7], 0);
440
441 let _ = progress.update(&7, 7);
442 let _ = progress.update(&3, 3);
443 let _ = progress.update(&1, 1);
444
445 assert_eq!(
446 vec![
447 (3, 3),
449 (1, 1),
450 (0, 0),
451 (2, 0),
452 (4, 0),
453 (6, 0),
454 (7, 7),
455 ],
456 progress.iter().copied().collect::<Vec<_>>(),
457 "iter() returns voter first, followed by learners"
458 );
459
460 Ok(())
461 }
462
463 #[test]
464 fn vec_progress_move_up() -> anyhow::Result<()> {
465 let quorum_set: Vec<u64> = vec![0, 1, 2, 3, 4];
466 let mut progress = VecProgress::<u64, u64, u64, _>::new(quorum_set, [6], 0);
467
468 let cases = [
470 ((1, 2), &[(1, 2), (0, 0), (2, 0), (3, 0), (4, 0), (6, 0)], 0), ((2, 3), &[(2, 3), (1, 2), (0, 0), (3, 0), (4, 0), (6, 0)], 0), ((1, 3), &[(2, 3), (1, 3), (0, 0), (3, 0), (4, 0), (6, 0)], 1), ((4, 8), &[(4, 8), (2, 3), (1, 3), (0, 0), (3, 0), (6, 0)], 0), ((0, 5), &[(4, 8), (0, 5), (2, 3), (1, 3), (3, 0), (6, 0)], 1), ];
476 for (ith, ((id, v), want_vec, want_new_index)) in cases.iter().enumerate() {
477 let index = progress.index(id).unwrap();
479 progress.vector[index].1 = *v;
480 let got = progress.move_up(index);
481
482 assert_eq!(
483 want_vec.as_slice(),
484 &progress.vector,
485 "{}-th case: idx:{}, v:{}",
486 ith,
487 *id,
488 *v
489 );
490 assert_eq!(*want_new_index, got, "{}-th case: idx:{}, v:{}", ith, *id, *v);
491 }
492 Ok(())
493 }
494
495 #[test]
496 fn vec_progress_update() -> anyhow::Result<()> {
497 let quorum_set: Vec<u64> = vec![0, 1, 2, 3, 4];
498 let mut progress = VecProgress::<u64, u64, u64, _>::new(quorum_set, [6], 0);
499
500 let cases = vec![
502 ((6, 9), Ok(&0)), ((1, 2), Ok(&0)), ((2, 3), Ok(&0)), ((3, 1), Ok(&1)), ((4, 5), Ok(&2)), ((0, 4), Ok(&3)), ((3, 2), Ok(&3)), ((3, 3), Ok(&3)), ((1, 4), Ok(&4)), ((9, 1), Err(&4)), ];
513
514 for (ith, ((id, v), want_committed)) in cases.iter().enumerate() {
516 let got = progress.update_with(id, |x| *x = *v);
517 assert_eq!(want_committed.clone(), got, "{}-th case: id:{}, v:{}", ith, id, v);
518 }
519 Ok(())
520 }
521
522 #[derive(Clone, Copy, Debug, PartialEq, Eq)]
524 struct ProgressEntry {
525 progress: u64,
526 user_data: &'static str,
527 }
528
529 impl Borrow<u64> for ProgressEntry {
530 fn borrow(&self) -> &u64 {
531 &self.progress
532 }
533 }
534
535 #[test]
536 fn vec_progress_update_struct_value() -> anyhow::Result<()> {
537 let pv = |p, user_data| ProgressEntry { progress: p, user_data };
538
539 let quorum_set: Vec<u64> = vec![0, 1, 2];
540 let mut progress = VecProgress::<u64, ProgressEntry, u64, _>::new(quorum_set, [3], pv(0, "foo"));
541
542 let cases = [
544 (3, pv(9, "a"), Ok(&0)), (1, pv(2, "b"), Ok(&0)), (2, pv(3, "c"), Ok(&2)), (1, pv(2, "d"), Ok(&2)), ];
549
550 for (ith, (id, v, want_committed)) in cases.iter().enumerate() {
551 let got = progress.update(id, *v);
552 assert_eq!(want_committed.clone(), got, "{}-th case: id:{}, v:{:?}", ith, id, v);
553 }
554
555 assert_eq!(pv(0, "foo"), *progress.get(&0),);
558 assert_eq!(pv(2, "d"), *progress.get(&1),);
559 assert_eq!(pv(3, "c"), *progress.get(&2),);
560 assert_eq!(pv(9, "a"), *progress.get(&3),);
561
562 Ok(())
563 }
564
565 #[test]
566 fn vec_progress_update_does_not_move_learner_elt() -> anyhow::Result<()> {
567 let quorum_set: Vec<u64> = vec![0, 1, 2, 3, 4];
568 let mut progress = VecProgress::<u64, u64, u64, _>::new(quorum_set, [6], 0);
569
570 assert_eq!(Some(5), progress.index(&6));
571
572 let _ = progress.update(&6, 6);
573 assert_eq!(Some(5), progress.index(&6), "learner is not moved");
574
575 let _ = progress.update(&4, 4);
576 assert_eq!(Some(0), progress.index(&4), "voter is not moved");
577 Ok(())
578 }
579
580 #[test]
581 fn vec_progress_upgrade_quorum_set() -> anyhow::Result<()> {
582 let qs012 = Joint::from(vec![vec![0, 1, 2]]);
583 let qs012_345 = Joint::from(vec![vec![0, 1, 2], vec![3, 4, 5]]);
584 let qs345 = Joint::from(vec![vec![3, 4, 5]]);
585
586 let mut p012 = VecProgress::<u64, u64, u64, _>::new(qs012, [5], 0);
589
590 let _ = p012.update(&0, 5);
591 let _ = p012.update(&1, 6);
592 let _ = p012.update(&5, 9);
593 assert_eq!(&5, p012.granted());
594
595 let mut p012_345 = p012.upgrade_quorum_set(qs012_345, &[6], 0);
598 assert_eq!(
599 &0,
600 p012_345.granted(),
601 "quorum extended from 012 to 012_345, committed falls back"
602 );
603 assert_eq!(&9, p012_345.get(&5), "inherit learner progress");
604
605 let _ = p012_345.update(&3, 7);
608 let _ = p012_345.update(&4, 8);
609 assert_eq!(&5, p012_345.granted());
610
611 let p345 = p012_345.upgrade_quorum_set(qs345, &[1], 0);
612
613 assert_eq!(&8, p345.granted(), "shrink quorum set, greater value becomes committed");
614 assert_eq!(&6, p345.get(&1), "inherit voter progress");
615
616 Ok(())
617 }
618
619 #[test]
620 fn vec_progress_is_voter() -> anyhow::Result<()> {
621 let quorum_set: Vec<u64> = vec![0, 1, 2, 3, 4];
622 let progress = VecProgress::<u64, u64, u64, _>::new(quorum_set, [6, 7], 0);
623
624 assert_eq!(Some(true), progress.is_voter(&1));
625 assert_eq!(Some(true), progress.is_voter(&3));
626 assert_eq!(Some(false), progress.is_voter(&7));
627 assert_eq!(None, progress.is_voter(&8));
628
629 Ok(())
630 }
631}