sdcons/
progress.rs

1// Copyright 2021 The sdcons Authors. Licensed under Apache-2.0.
2
3// Copyright 2015 The etcd Authors
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9//     http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17use std::collections::{HashMap, HashSet};
18
19use crate::constant::*;
20use crate::types::{ConfigStage, Entry, EntryMeta, HardState, MemberState};
21
22use chrono::{prelude::*, DateTime};
23use log::{debug, error, info, trace, warn};
24
25#[derive(Copy, Clone, Debug, PartialEq, Eq)]
26pub enum Role {
27    Leader,
28    Student,
29    Follower,
30    Candidate,
31}
32
33impl std::fmt::Display for Role {
34    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
35        write!(f, "{:?}", self)
36    }
37}
38
39#[inline(always)]
40fn clone_last_log_meta(metas: &Vec<EntryMeta>) -> EntryMeta {
41    metas
42        .last()
43        .map(|e| e.clone())
44        .unwrap_or(EntryMeta::default())
45}
46
47#[inline(always)]
48pub fn majority(len: usize) -> usize {
49    (len + 1) / 2
50}
51
52#[derive(Debug, Eq, PartialEq)]
53pub enum SnapshotState {
54    Creating,
55    Loading,
56    None,
57}
58
59#[derive(Debug)]
60pub(crate) struct SnapshotRecevingState {
61    pub receiving: bool,
62    pub start_at: i64,
63    pub last_heartbeat_at: i64,
64}
65
66#[derive(Debug, Clone)]
67pub struct Inflights {
68    pub pos: usize,
69    pub cnt: usize,
70    pub cap: usize,
71    pub buf: Vec<u64>,
72}
73
74impl Inflights {
75    fn new(cap: usize) -> Inflights {
76        let mut buf = Vec::new();
77        buf.resize(cap, 0);
78        Inflights {
79            pos: 0,
80            cnt: 0,
81            cap,
82            buf,
83        }
84    }
85
86    fn full(&self) -> bool {
87        self.cnt == self.cap
88    }
89
90    fn reset(&mut self) {
91        self.cnt = 0;
92        self.pos = 0;
93    }
94
95    fn add(&mut self, id: u64) {
96        if self.full() {
97            return;
98        }
99
100        let next = (self.pos + self.cnt) % self.cap;
101        self.buf[next] = id;
102        self.cnt += 1;
103    }
104
105    fn free_to(&mut self, to: u64) {
106        if self.cnt == 0 || to < self.buf[self.pos] {
107            // out of the left window
108            return;
109        }
110
111        let mut idx = self.pos;
112        let mut i = 0;
113        while i < self.cnt {
114            if to < self.buf[idx] {
115                break;
116            }
117            idx = (idx + 1) % self.cap;
118            i += 1;
119        }
120
121        self.cnt -= i;
122        self.pos = idx;
123        if self.cnt == 0 {
124            self.pos = 0;
125        }
126    }
127}
128
129#[derive(Debug, Clone, PartialEq, Eq)]
130pub enum ProgressState {
131    Probe,
132    Replicate,
133    MsgBuilding,
134}
135
136#[derive(Debug, Clone)]
137pub struct Progress {
138    pub node_id: u64,
139
140    pub match_id: u64,
141    pub next_id: u64,
142
143    pub granted: bool,
144    pub active: bool,
145
146    pub paused: bool,
147    pub inflights: Inflights,
148    pub state: ProgressState,
149    pub belongs_to: ConfigStage,
150}
151
152impl Progress {
153    pub fn new(node_id: u64, belongs_to: ConfigStage) -> Progress {
154        Progress {
155            node_id,
156            match_id: INVALID_ID,
157            next_id: INVALID_ID + 1,
158
159            granted: false,
160            active: false,
161
162            paused: false,
163            inflights: Inflights::new(128),
164            state: ProgressState::Probe,
165            belongs_to,
166        }
167    }
168
169    pub fn reset(&mut self, last_id: u64) {
170        self.match_id = INVALID_ID;
171        self.next_id = last_id + 1;
172        self.granted = false;
173        self.active = true;
174        self.state = ProgressState::Probe;
175    }
176
177    pub fn try_advance_matched_id(&mut self, hint_id: u64) -> bool {
178        self.paused = false;
179        if self.match_id + 1u64 < hint_id {
180            self.match_id = hint_id - 1u64;
181            if self.next_id <= self.match_id {
182                self.next_id = self.match_id + 1u64;
183            }
184            match &self.state {
185                ProgressState::Probe => {
186                    self.inflights.reset();
187                    self.state = ProgressState::Replicate;
188                }
189                ProgressState::Replicate => {
190                    self.inflights.free_to(self.match_id);
191                }
192                _ => {}
193            }
194            true
195        } else {
196            false
197        }
198    }
199
200    pub fn try_reset_next_id(&mut self, reject_id: u64, hint_id: u64) -> bool {
201        self.paused = false;
202        // Ignore staled reply.
203        match &self.state {
204            ProgressState::Replicate if reject_id > self.match_id => {
205                self.state = ProgressState::Probe;
206                self.next_id = self.match_id + 1u64;
207                true
208            }
209            ProgressState::Probe if reject_id == self.next_id => {
210                self.next_id = std::cmp::max(1, hint_id);
211                true
212            }
213            _ => false,
214        }
215    }
216
217    pub fn on_receive_msg(&mut self) {
218        self.active = true;
219    }
220
221    pub fn is_paused(&self) -> bool {
222        match &self.state {
223            ProgressState::Probe => self.paused,
224            ProgressState::MsgBuilding => true,
225            ProgressState::Replicate => self.inflights.full(),
226        }
227    }
228
229    pub fn step_building_msg(&mut self) {
230        self.state = ProgressState::MsgBuilding;
231    }
232
233    pub fn step_builded(&mut self, hint_id: u64) {
234        self.state = ProgressState::Probe;
235        self.paused = false;
236        if self.match_id <= hint_id {
237            self.next_id = hint_id;
238        }
239    }
240
241    pub fn replicate_to(&mut self, last_id: u64) {
242        if last_id <= self.match_id {
243            return;
244        }
245        match &self.state {
246            ProgressState::Probe => {
247                self.paused = true;
248            }
249            ProgressState::Replicate => {
250                self.inflights.add(last_id);
251                self.next_id = last_id + 1u64;
252            }
253            _ => {}
254        }
255    }
256
257    pub fn belongs_to_stage(&self, stage: ConfigStage) -> bool {
258        self.belongs_to == stage || self.belongs_to == ConfigStage::Both
259    }
260}
261
262#[derive(Debug)]
263pub struct ChannelDesc {
264    pub channel_id: u64,
265    pub committed_id: u64,
266    pub last_id: u64,
267    pub hard_state: HardState,
268    pub members: HashMap<u64, MemberState>,
269}
270
271#[derive(Debug, Clone)]
272pub struct ChannelInfo {
273    pub local_id: u64,
274    pub channel_id: u64,
275
276    pub progress_map: HashMap<u64, Progress>,
277
278    pub voted_for: u64,
279    pub term: u64,
280    pub committed_id: u64,
281    pub prev_term_last_id: u64,
282
283    pub leader_id: u64,
284    pub role: Role,
285
286    pub last_liveness_at: i64,
287    pub elapsed_tick: u32,
288
289    // learned entry metas from remote, only used for command instance.
290    pub max_received_entries: Vec<EntryMeta>,
291
292    /// Id set of the members who won't granted to me.
293    pub missed_voters: HashSet<u64>,
294    pub learned_voters: HashSet<u64>,
295}
296
297impl ChannelInfo {
298    pub fn build(local_id: u64, channel_id: u64, descs: HashMap<u64, MemberState>) -> ChannelInfo {
299        let desc = ChannelDesc {
300            channel_id,
301            committed_id: INVALID_ID,
302            last_id: INVALID_ID,
303            hard_state: HardState::default(),
304            members: descs,
305        };
306        ChannelInfo::new(local_id, &desc)
307    }
308
309    pub fn new(local_id: u64, channel_desc: &ChannelDesc) -> ChannelInfo {
310        let mut progress_map = channel_desc
311            .members
312            .iter()
313            .map(|(id, desc)| (*id, Progress::new(*id, desc.stage)))
314            .collect::<HashMap<u64, Progress>>();
315
316        let mut initial_local_matched_id = 0;
317        for (id, p) in &mut progress_map {
318            p.reset(channel_desc.last_id);
319            // We need apply chosen entries immediately, so update match index to advance
320            // safety committed id.
321            if *id == local_id {
322                p.match_id = channel_desc.last_id;
323                initial_local_matched_id = channel_desc.last_id;
324            }
325        }
326
327        info!(
328            "node {} setup channel {}, voted_for {}, term {}, committed_id {}, last_id {}, matched_id {}",
329            local_id,
330            channel_desc.channel_id,
331            channel_desc.hard_state.voted_for,
332            channel_desc.hard_state.current_term,
333            channel_desc.committed_id,
334            channel_desc.last_id,
335            initial_local_matched_id
336        );
337
338        ChannelInfo {
339            local_id,
340            channel_id: channel_desc.channel_id,
341            progress_map,
342            voted_for: channel_desc.hard_state.voted_for,
343            term: channel_desc.hard_state.current_term,
344            committed_id: channel_desc.committed_id,
345            prev_term_last_id: channel_desc.last_id,
346            leader_id: INVALID_NODE_ID,
347            role: Role::Follower,
348            elapsed_tick: 0,
349            last_liveness_at: Local::now().timestamp_nanos(),
350            max_received_entries: Vec::new(),
351            missed_voters: HashSet::new(),
352            learned_voters: HashSet::new(),
353        }
354    }
355
356    pub fn current_term(&self) -> u64 {
357        return self.term;
358    }
359
360    pub fn last_learned_entry_meta(&self) -> EntryMeta {
361        clone_last_log_meta(&self.max_received_entries)
362    }
363
364    pub fn try_receive_prepare_entries(&mut self, entries: Vec<EntryMeta>) -> bool {
365        let local_last = clone_last_log_meta(&self.max_received_entries);
366        let remote_last = clone_last_log_meta(&entries);
367        if remote_last.term > local_last.term
368            || (remote_last.term == local_last.term && local_last.id < remote_last.id)
369        {
370            debug!(
371                "node {} channel {} receive prepare entries last id {}, term: {}",
372                self.local_id, self.channel_id, remote_last.id, remote_last.term
373            );
374            self.max_received_entries = entries;
375            true
376        } else {
377            false
378        }
379    }
380
381    pub fn reset_tick(&mut self) {
382        // debug!(
383        //     "node {} channel {} reset tick",
384        //     self.local_id, self.channel_id
385        // );
386        self.elapsed_tick = 0;
387    }
388
389    pub fn on_receive_msg(&mut self, from: u64) {
390        // debug!(
391        //     "node {} channel {} receive msg from {}",
392        //     self.local_id, self.channel_id, from
393        // );
394        if let Some(p) = self.progress_map.get_mut(&from) {
395            p.on_receive_msg();
396        }
397    }
398
399    pub fn advance_quorum_lease(&mut self) -> usize {
400        let mut count = 0;
401        for (_, p) in &mut self.progress_map {
402            if p.active {
403                count += 1;
404                p.active = false;
405            }
406        }
407        count
408    }
409
410    fn reset(&mut self, last_id: u64) {
411        let local_id = self.local_id;
412        self.progress_map.iter_mut().for_each(|(id, progress)| {
413            let last_match_id = progress.match_id;
414            progress.reset(last_id);
415            if *id == local_id {
416                progress.match_id = std::cmp::max(last_match_id, last_id);
417            }
418        });
419    }
420
421    pub fn to_leader(&mut self, last_id: u64, reason: &'static str) {
422        let _prev_role = self.to(Role::Leader, reason);
423        self.leader_id = self.local_id;
424        self.reset(last_id);
425    }
426
427    pub fn to_student(&mut self, last_id: u64, reason: &'static str) {
428        let _prev_role = self.to(Role::Student, reason);
429        self.leader_id = self.local_id;
430        self.missed_voters = self
431            .progress_map
432            .iter()
433            .filter(|(_, p)| !p.granted)
434            .map(|(id, _)| *id)
435            .collect::<HashSet<_>>();
436        self.learned_voters = HashSet::new();
437        self.max_received_entries.clear();
438        self.reset(last_id);
439    }
440
441    pub fn to_candidate(&mut self, last_id: u64, reason: &'static str) {
442        let _prev_role = self.to(Role::Candidate, reason);
443        self.term += 1;
444        self.leader_id = INVALID_NODE_ID;
445        self.prev_term_last_id = last_id;
446        self.voted_for = self.local_id;
447        self.max_received_entries.clear();
448        self.reset(INVALID_ID);
449    }
450
451    pub fn to_follower(
452        &mut self,
453        leader_id: u64,
454        target_term: u64,
455        last_id: u64,
456        reason: &'static str,
457    ) {
458        let _prev_role = self.to(Role::Follower, reason);
459        if self.term < target_term {
460            info!(
461                "node {} channel {} advance term from {} to {}",
462                self.local_id, self.channel_id, self.term, target_term
463            );
464            self.term = target_term;
465            self.voted_for = INVALID_NODE_ID;
466            self.prev_term_last_id = last_id;
467        }
468        if self.leader_id != leader_id {
469            info!(
470                "node {} channel {} follow leader {} from {} at term {}",
471                self.local_id, self.channel_id, leader_id, self.leader_id, self.term
472            );
473            self.leader_id = leader_id;
474        }
475        self.reset(INVALID_ID);
476    }
477
478    fn to(&mut self, target_role: Role, reason: &'static str) -> Role {
479        info!(
480            "node {} channel {} change role from {} to {} at term {}, reason: {}",
481            self.local_id, self.channel_id, self.role, target_role, self.term, reason
482        );
483        let prev_role = self.role;
484        self.role = target_role;
485        self.elapsed_tick = 0;
486        prev_role
487    }
488
489    pub fn is_already_promise_others(&self, node_id: u64) -> bool {
490        // If current node has voted before and the voted node isn't msg.from
491        self.voted_for != INVALID_NODE_ID && self.voted_for != node_id
492    }
493
494    pub fn try_make_promise(&mut self, node_id: u64) -> bool {
495        if !self.is_already_promise_others(node_id) {
496            debug!(
497                "node {} channel {} take promise for {}",
498                self.local_id, self.channel_id, node_id
499            );
500            self.voted_for = node_id;
501            self.reset_tick();
502            true
503        } else {
504            false
505        }
506    }
507
508    pub fn receive_promise(&mut self, node_id: u64) {
509        match self.progress_map.get_mut(&node_id) {
510            Some(p) => p.granted = true,
511            None => {}
512        };
513    }
514
515    pub fn stage_majority(&self, stage: ConfigStage) -> usize {
516        majority(
517            self.progress_map
518                .iter()
519                .filter(|(_, p)| p.belongs_to_stage(stage))
520                .count(),
521        )
522    }
523
524    pub fn current_term_safe_commit_id(&self) -> u64 {
525        std::cmp::max(self.committed_id, self.prev_term_last_id)
526    }
527
528    fn receive_stage_majority_promise(&self, stage: ConfigStage) -> bool {
529        let granted_members = self
530            .progress_map
531            .iter()
532            .filter(|(_, p)| p.granted && p.belongs_to_stage(stage))
533            .map(|(k, _)| *k)
534            .collect::<Vec<u64>>();
535
536        return granted_members.len() >= self.stage_majority(stage);
537    }
538
539    pub fn receive_majority_promise(&self, stage: ConfigStage) -> bool {
540        match &stage {
541            ConfigStage::Both => {
542                self.receive_stage_majority_promise(ConfigStage::Old)
543                    && self.receive_stage_majority_promise(ConfigStage::New)
544            }
545            _ => self.receive_stage_majority_promise(ConfigStage::New),
546        }
547    }
548
549    fn compute_candidate_id(&self, stage: ConfigStage) -> u64 {
550        let mut match_ids = self
551            .progress_map
552            .iter()
553            .filter(|(_, p)| p.belongs_to_stage(stage))
554            .map(|(_, p)| p.match_id)
555            .collect::<Vec<_>>();
556        match_ids.sort();
557        let total_numbers = match_ids.len();
558        if total_numbers == 0 {
559            self.committed_id
560        } else {
561            match_ids[total_numbers - majority(total_numbers)]
562        }
563    }
564
565    pub fn try_advance_committed_id(&mut self, stage: ConfigStage) -> bool {
566        let candidate_id = match &stage {
567            ConfigStage::Both => std::cmp::min(
568                self.compute_candidate_id(ConfigStage::New),
569                self.compute_candidate_id(ConfigStage::Old),
570            ),
571            _ => self.compute_candidate_id(ConfigStage::New),
572        };
573        if candidate_id > std::cmp::max(self.prev_term_last_id, self.committed_id) {
574            trace!(
575                "node {} channel {} advance committed index from {} to {}, stage: {:?}",
576                self.local_id,
577                self.channel_id,
578                self.committed_id,
579                candidate_id,
580                stage
581            );
582            self.committed_id = candidate_id;
583            true
584        } else {
585            false
586        }
587    }
588
589    pub fn update_committed_id(&mut self, id: u64) {
590        if self.committed_id < id {
591            debug!(
592                "node {} channel {} update committed id from {} to {}",
593                self.local_id, self.channel_id, self.committed_id, id
594            );
595            self.committed_id = id;
596        }
597    }
598
599    pub fn update_progress(&mut self, remote: u64, reject: bool, msg_id: u64, hint_id: u64) {
600        let is_leader = self.is_leader();
601        if let Some(progress) = self.progress_map.get_mut(&remote) {
602            progress.on_receive_msg();
603            if !reject {
604                debug!(
605                    "node {} channel {} try advance remote {} match id to {}",
606                    self.local_id,
607                    self.channel_id,
608                    remote,
609                    hint_id - 1u64
610                );
611                progress.try_advance_matched_id(hint_id);
612            } else if is_leader {
613                debug!(
614                    "node {} channel {} try reset remote {} next id to {}",
615                    self.local_id, self.channel_id, remote, hint_id
616                );
617                progress.try_reset_next_id(msg_id, hint_id);
618            } else {
619                warn!(
620                    "node {} channel id {} receive a rejected index request {:?} but I isn't a leader any more",
621                    self.local_id, self.channel_id, reject
622                );
623            }
624        }
625    }
626
627    pub fn is_leader(&self) -> bool {
628        self.role == Role::Leader
629    }
630
631    pub fn is_candidate(&self) -> bool {
632        self.role == Role::Candidate
633    }
634
635    pub fn is_follower(&self) -> bool {
636        self.role == Role::Follower
637    }
638
639    pub fn is_student(&self) -> bool {
640        self.role == Role::Student
641    }
642
643    pub fn has_leader(&self) -> bool {
644        self.leader_id != INVALID_NODE_ID
645    }
646
647    pub fn is_remote_matched(&self, remote: u64, last_id: u64) -> bool {
648        if let Some(progress) = self.progress_map.get(&remote) {
649            if progress.match_id == last_id {
650                return true;
651            }
652        }
653        false
654    }
655
656    pub fn missed_channel_ids(&self) -> Vec<u64> {
657        let mut missed_channel_ids = Vec::new();
658        for channel_id in &self.missed_voters {
659            if self.learned_voters.contains(&channel_id) {
660                continue;
661            }
662            missed_channel_ids.push(*channel_id);
663        }
664        missed_channel_ids
665    }
666
667    pub fn update_local_match(&mut self, to: u64) {
668        if let Some(progress) = self.progress_map.get_mut(&self.local_id) {
669            debug!(
670                "node {} channel {} update the local matched entry id from {} to {}",
671                self.local_id, self.channel_id, progress.match_id, to
672            );
673            assert_eq!(progress.match_id <= to, true);
674            progress.match_id = to;
675        }
676    }
677
678    pub fn get_local_match_id(&self) -> u64 {
679        self.progress_map
680            .get(&self.local_id)
681            .map(|p| p.match_id)
682            .unwrap_or(INVALID_ID)
683    }
684
685    pub fn get_local_safety_commit_id(&self) -> u64 {
686        std::cmp::min(self.committed_id, self.get_local_match_id())
687    }
688
689    pub fn log_replicated(&mut self, node_id: u64, next_id: u64) {
690        self.progress_map
691            .get_mut(&node_id)
692            .map(|p| p.step_builded(next_id));
693    }
694
695    pub fn hard_state(&self) -> HardState {
696        HardState {
697            voted_for: self.voted_for,
698            current_term: self.term,
699        }
700    }
701
702    pub fn enter_both_config_stage(
703        &mut self,
704        new_configs: &HashSet<u64>,
705        old_configs: &HashSet<u64>,
706    ) {
707        debug!(
708            "node {} channel {} enter both stage: new configs {:?}, old configs {:?}",
709            self.local_id, self.channel_id, new_configs, old_configs
710        );
711        for (id, progress) in &mut self.progress_map {
712            progress.belongs_to = if old_configs.contains(id) {
713                ConfigStage::Old
714            } else {
715                ConfigStage::Both
716            };
717        }
718
719        for id in new_configs {
720            if let Some(p) = self.progress_map.get_mut(id) {
721                p.belongs_to = ConfigStage::New;
722                continue;
723            }
724            self.progress_map
725                .insert(*id, Progress::new(*id, ConfigStage::New));
726        }
727    }
728
729    pub fn enter_new_config_stage(
730        &mut self,
731        new_configs: &HashSet<u64>,
732        old_configs: &HashSet<u64>,
733    ) {
734        debug!(
735            "node {} channel {} enter new stage: new configs {:?}, old configs {:?}",
736            self.local_id, self.channel_id, new_configs, old_configs
737        );
738        for id in old_configs {
739            self.progress_map.remove(id);
740        }
741        for (_id, progress) in &mut self.progress_map {
742            progress.belongs_to = ConfigStage::New;
743        }
744    }
745
746    pub fn rollback_config_change(
747        &mut self,
748        new_configs: &HashSet<u64>,
749        old_configs: &HashSet<u64>,
750    ) {
751        self.enter_new_config_stage(old_configs, new_configs);
752    }
753
754    pub fn get_member_states(&self) -> HashMap<u64, MemberState> {
755        self.progress_map
756            .iter()
757            .filter(|(id, _)| **id != INDEX_CHANNEL_ID)
758            .map(|(id, _)| (*id, MemberState::default()))
759            .collect::<HashMap<_, _>>()
760    }
761
762    pub fn matched_committed_id(&self, id: u64) -> u64 {
763        let match_id = self.progress_map.get(&id).unwrap().match_id;
764        std::cmp::min(match_id, self.committed_id)
765    }
766}
767
768#[cfg(test)]
769mod tests {
770    use super::*;
771
772    #[test]
773    fn reset_to_probe() {
774        let mut p = Progress::new(1, ConfigStage::New);
775        let last_id = 10;
776        p.reset(last_id);
777        assert_eq!(p.next_id, last_id + 1u64);
778        assert_eq!(p.match_id, INVALID_ID);
779        assert_eq!(p.state, ProgressState::Probe);
780
781        let mut hint_id = 5;
782        p.try_advance_matched_id(hint_id);
783        assert_eq!(p.match_id, hint_id - 1u64);
784        assert_eq!(p.next_id, last_id + 1u64);
785        assert_eq!(p.state, ProgressState::Replicate);
786
787        p.reset(123);
788        assert_eq!(p.state, ProgressState::Probe);
789    }
790
791    #[test]
792    fn update_match_next_id() {
793        let mut p = Progress::new(1, ConfigStage::New);
794        let last_id = 10;
795        p.reset(last_id);
796        assert_eq!(p.next_id, last_id + 1u64);
797        assert_eq!(p.match_id, INVALID_ID);
798        assert_eq!(p.state, ProgressState::Probe);
799
800        let mut hint_id = 5;
801        p.try_advance_matched_id(hint_id);
802        assert_eq!(p.match_id, hint_id - 1u64);
803        assert_eq!(p.next_id, last_id + 1u64);
804        assert_eq!(p.state, ProgressState::Replicate);
805
806        // ignore staled request
807        hint_id = 3;
808        p.try_advance_matched_id(hint_id);
809        assert_eq!(p.match_id, 4);
810        assert_eq!(p.next_id, last_id + 1u64);
811
812        // advance to next_id
813        p.try_advance_matched_id(last_id + 1u64);
814        assert_eq!(p.match_id, last_id);
815        assert_eq!(p.next_id, last_id + 1u64);
816
817        // ignore staled reseet request
818        p.next_id = 100;
819        p.try_reset_next_id(hint_id, hint_id);
820        assert_eq!(p.match_id, last_id);
821        assert_eq!(p.next_id, 100);
822
823        p.state = ProgressState::Replicate;
824        p.try_reset_next_id(p.next_id, 50);
825        assert_eq!(p.match_id, last_id);
826        assert_eq!(p.next_id, p.match_id + 1u64);
827
828        assert_eq!(p.state, ProgressState::Probe);
829        p.try_reset_next_id(p.next_id, 50);
830        assert_eq!(p.match_id, last_id);
831        assert_eq!(p.next_id, 50);
832    }
833}