1use std::{
22 collections::VecDeque,
23 fmt::{Display, Formatter},
24 mem,
25};
26
27use prost::Message as PbMessage;
28use tracing::info;
29
30use crate::{
31 config::Config,
32 eraftpb::{ConfChange, ConfState, Entry, EntryType, HardState, Message, MessageType, Snapshot},
33 errors::{Error, Result},
34 read_only::ReadState,
35 storage::GetEntriesFor,
36 GetEntriesContext, Raft, SoftState, StateRole, Status, Storage,
37};
38
39#[derive(Debug, Default, Clone)]
41pub struct Peer {
42 pub id: u64,
44 pub address: String,
46}
47
48impl Display for Peer {
49 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
50 write!(f, "n{}({})", self.id, self.address)
51 }
52}
53
54#[derive(Debug, PartialEq, Eq, Copy, Clone)]
56pub enum SnapshotStatus {
57 Finish,
59 Failure,
61}
62
63pub fn is_local_msg(t: MessageType) -> bool {
65 matches!(
66 t,
67 MessageType::MsgHup
68 | MessageType::MsgBeat
69 | MessageType::MsgUnreachable
70 | MessageType::MsgSnapStatus
71 | MessageType::MsgCheckQuorum
72 )
73}
74
75fn is_response_msg(t: MessageType) -> bool {
76 matches!(
77 t,
78 MessageType::MsgAppendResponse
79 | MessageType::MsgRequestVoteResponse
80 | MessageType::MsgHeartbeatResponse
81 | MessageType::MsgUnreachable
82 | MessageType::MsgRequestPreVoteResponse
83 )
84}
85
86#[derive(Default, Debug, PartialEq)]
89pub struct Ready {
90 number: u64,
91
92 ss: Option<SoftState>,
93
94 hs: Option<HardState>,
95
96 read_states: Vec<ReadState>,
97
98 entries: Vec<Entry>,
99
100 snapshot: Snapshot,
101
102 is_persisted_msg: bool,
103
104 light: LightReady,
105
106 must_sync: bool,
107}
108
109impl Ready {
110 #[inline]
113 pub fn number(&self) -> u64 {
114 self.number
115 }
116
117 #[inline]
121 pub fn ss(&self) -> Option<&SoftState> {
122 self.ss.as_ref()
123 }
124
125 #[inline]
128 pub fn hs(&self) -> Option<&HardState> {
129 self.hs.as_ref()
130 }
131
132 #[inline]
134 pub fn read_states(&self) -> &Vec<ReadState> {
135 &self.read_states
136 }
137
138 #[inline]
140 pub fn take_read_states(&mut self) -> Vec<ReadState> {
141 mem::take(&mut self.read_states)
142 }
143
144 #[inline]
146 pub fn entries(&self) -> &Vec<Entry> {
147 &self.entries
148 }
149
150 #[inline]
152 pub fn take_entries(&mut self) -> Vec<Entry> {
153 mem::take(&mut self.entries)
154 }
155
156 #[inline]
158 pub fn snapshot(&self) -> &Snapshot {
159 &self.snapshot
160 }
161
162 #[inline]
166 pub fn committed_entries(&self) -> &Vec<Entry> {
167 self.light.committed_entries()
168 }
169
170 #[inline]
172 pub fn take_committed_entries(&mut self) -> Vec<Entry> {
173 self.light.take_committed_entries()
174 }
175
176 #[inline]
180 pub fn messages(&self) -> &[Message] {
181 if !self.is_persisted_msg {
182 self.light.messages()
183 } else {
184 &[]
185 }
186 }
187
188 #[inline]
190 pub fn take_messages(&mut self) -> Vec<Message> {
191 if !self.is_persisted_msg {
192 self.light.take_messages()
193 } else {
194 Vec::new()
195 }
196 }
197
198 #[inline]
201 pub fn persisted_messages(&self) -> &[Message] {
202 if self.is_persisted_msg {
203 self.light.messages()
204 } else {
205 &[]
206 }
207 }
208
209 #[inline]
211 pub fn take_persisted_messages(&mut self) -> Vec<Message> {
212 if self.is_persisted_msg {
213 self.light.take_messages()
214 } else {
215 Vec::new()
216 }
217 }
218
219 #[inline]
225 pub fn must_sync(&self) -> bool {
226 self.must_sync
227 }
228}
229
230#[derive(Default, Debug, PartialEq)]
232struct ReadyRecord {
233 number: u64,
234 last_entry: Option<(u64, u64)>,
236 snapshot: Option<(u64, u64)>,
238}
239
240#[derive(Default, Debug, PartialEq)]
243pub struct LightReady {
244 commit_index: Option<u64>,
245 committed_entries: Vec<Entry>,
246 messages: Vec<Message>,
247}
248
249impl LightReady {
250 #[inline]
254 pub fn commit_index(&self) -> Option<u64> {
255 self.commit_index
256 }
257
258 #[inline]
262 pub fn committed_entries(&self) -> &Vec<Entry> {
263 &self.committed_entries
264 }
265
266 #[inline]
268 pub fn take_committed_entries(&mut self) -> Vec<Entry> {
269 mem::take(&mut self.committed_entries)
270 }
271
272 #[inline]
274 pub fn messages(&self) -> &[Message] {
275 &self.messages
276 }
277
278 #[inline]
280 pub fn take_messages(&mut self) -> Vec<Message> {
281 mem::take(&mut self.messages)
282 }
283}
284
285pub struct RawNode<T: Storage> {
289 pub raft: Raft<T>,
291 prev_ss: SoftState,
292 prev_hs: HardState,
293 max_number: u64,
295 records: VecDeque<ReadyRecord>,
296 commit_since_index: u64,
298}
299
300impl<T: Storage> RawNode<T> {
301 pub fn new(config: &Config, store: T) -> Result<Self> {
303 assert_ne!(config.id, 0, "config.id must not be zero");
304 let r = Raft::new(config, store)?;
305 let mut rn = RawNode {
306 raft: r,
307 prev_hs: Default::default(),
308 prev_ss: Default::default(),
309 max_number: 0,
310 records: VecDeque::new(),
311 commit_since_index: config.applied,
312 };
313 rn.prev_hs = rn.raft.hard_state();
314 rn.prev_ss = rn.raft.soft_state();
315 info!("RawNode created with id {id}.", id = rn.raft.id);
316 Ok(rn)
317 }
318
319 #[inline]
321 pub fn set_priority(&mut self, priority: i64) {
322 self.raft.set_priority(priority);
323 }
324
325 pub fn tick(&mut self) -> bool {
330 self.raft.tick()
331 }
332
333 pub fn campaign(&mut self) -> Result<()> {
335 let mut m = Message::default();
336 m.set_msg_type(MessageType::MsgHup);
337 self.raft.step(m)
338 }
339
340 pub fn propose(&mut self, context: Vec<u8>, data: Vec<u8>) -> Result<()> {
342 let m = Message {
343 msg_type: MessageType::MsgPropose as i32,
344 from: self.raft.id,
345 entries: vec![Entry {
346 data,
347 context,
348 ..Default::default()
349 }],
350 ..Default::default()
351 };
352 self.raft.step(m)
353 }
354
355 pub fn ping(&mut self) {
359 self.raft.ping()
360 }
361
362 #[cfg_attr(feature = "cargo-clippy", allow(clippy::needless_pass_by_value))]
368 pub fn propose_conf_change(&mut self, context: Vec<u8>, cc: ConfChange) -> Result<()> {
369 let (data, ty) = (cc.encode_to_vec(), EntryType::EntryConfChange);
370 let m = Message {
371 msg_type: MessageType::MsgPropose as i32,
372 entries: vec![Entry {
373 entry_type: ty as i32,
374 data,
375 context,
376 ..Default::default()
377 }],
378 ..Default::default()
379 };
380 self.raft.step(m)
381 }
382
383 pub fn apply_conf_change(&mut self, cc: &ConfChange) -> Result<ConfState> {
387 self.raft.apply_conf_change(cc)
388 }
389
390 pub fn step(&mut self, m: Message) -> Result<()> {
392 if is_local_msg(m.msg_type()) {
394 return Err(Error::StepLocalMsg);
395 }
396 if self.raft.prs().get(m.from).is_some() || !is_response_msg(m.msg_type()) {
397 return self.raft.step(m);
398 }
399 Err(Error::StepPeerNotFound)
400 }
401
402 pub fn on_entries_fetched(&mut self, context: GetEntriesContext) {
410 match context.0 {
411 GetEntriesFor::SendAppend {
412 to,
413 term,
414 aggressively,
415 } => {
416 if self.raft.term != term || self.raft.state != StateRole::Leader {
417 return;
419 }
420 if self.raft.prs().get(to).is_none() {
421 return;
423 }
424
425 if aggressively {
426 self.raft.send_append_aggressively(to)
427 } else {
428 self.raft.send_append(to)
429 }
430 }
431 GetEntriesFor::Empty(can_async) if can_async => {}
432 _ => panic!("shouldn't call callback on non-async context"),
433 }
434 }
435
436 fn gen_light_ready(&mut self) -> LightReady {
438 let mut rd = LightReady::default();
439 let max_size = Some(self.raft.max_committed_size_per_ready);
440 let raft = &mut self.raft;
441 rd.committed_entries = raft
442 .raft_log
443 .next_entries_since(self.commit_since_index, max_size)
444 .unwrap_or_default();
445 raft.reduce_uncommitted_size(&rd.committed_entries);
447 if let Some(e) = rd.committed_entries.last() {
448 assert!(self.commit_since_index < e.index);
449 self.commit_since_index = e.index;
450 }
451
452 if !raft.msgs.is_empty() {
453 rd.messages = mem::take(&mut raft.msgs);
454 }
455
456 rd
457 }
458
459 pub fn ready(&mut self) -> Ready {
468 let raft = &mut self.raft;
469
470 self.max_number += 1;
471 let mut rd = Ready {
472 number: self.max_number,
473 ..Default::default()
474 };
475 let mut rd_record = ReadyRecord {
476 number: self.max_number,
477 ..Default::default()
478 };
479
480 if self.prev_ss.raft_state != StateRole::Leader && raft.state == StateRole::Leader {
481 for record in self.records.drain(..) {
485 assert_eq!(record.last_entry, None);
486 assert_eq!(record.snapshot, None);
487 }
488 }
489
490 let ss = raft.soft_state();
491 if ss != self.prev_ss {
492 rd.ss = Some(ss);
493 }
494 let hs = raft.hard_state();
495 if hs != self.prev_hs {
496 if hs.vote != self.prev_hs.vote || hs.term != self.prev_hs.term {
497 rd.must_sync = true;
498 }
499 rd.hs = Some(hs);
500 }
501
502 if !raft.read_states.is_empty() {
503 rd.read_states = mem::take(&mut raft.read_states);
504 }
505
506 if let Some(snapshot) = &raft.raft_log.unstable_snapshot() {
507 rd.snapshot = snapshot.clone();
508 assert!(self.commit_since_index <= rd.snapshot.get_metadata().index);
509 self.commit_since_index = rd.snapshot.get_metadata().index;
510 assert!(
513 !raft
514 .raft_log
515 .has_next_entries_since(self.commit_since_index),
516 "has snapshot but also has committed entries since {}",
517 self.commit_since_index
518 );
519 rd_record.snapshot = Some((
520 rd.snapshot.get_metadata().index,
521 rd.snapshot.get_metadata().term,
522 ));
523 rd.must_sync = true;
524 }
525
526 rd.entries = raft.raft_log.unstable_entries().to_vec();
527 if let Some(e) = rd.entries.last() {
528 rd.must_sync = true;
530 rd_record.last_entry = Some((e.index, e.term));
531 }
532
533 rd.is_persisted_msg = raft.state != StateRole::Leader;
536 rd.light = self.gen_light_ready();
537 self.records.push_back(rd_record);
538 rd
539 }
540
541 pub fn has_ready(&self) -> bool {
543 let raft = &self.raft;
544 if !raft.msgs.is_empty() {
545 return true;
546 }
547
548 if raft.soft_state() != self.prev_ss {
549 return true;
550 }
551 if raft.hard_state() != self.prev_hs {
552 return true;
553 }
554
555 if !raft.read_states.is_empty() {
556 return true;
557 }
558
559 if !raft.raft_log.unstable_entries().is_empty() {
560 return true;
561 }
562
563 if self.snap().map_or(false, |s| !s.is_empty()) {
564 return true;
565 }
566
567 if raft
568 .raft_log
569 .has_next_entries_since(self.commit_since_index)
570 {
571 return true;
572 }
573
574 false
575 }
576
577 fn commit_ready(&mut self, rd: Ready) {
578 if let Some(ss) = rd.ss {
579 self.prev_ss = ss;
580 }
581 if let Some(hs) = rd.hs {
582 self.prev_hs = hs;
583 }
584 let rd_record = self.records.back().unwrap();
585 assert_eq!(rd_record.number, rd.number);
586 let raft = &mut self.raft;
587 if let Some((index, _)) = rd_record.snapshot {
588 raft.raft_log.stable_snap(index);
589 }
590 if let Some((index, term)) = rd_record.last_entry {
591 raft.raft_log.stable_entries(index, term);
592 }
593 }
594
595 fn commit_apply(&mut self, applied: u64) {
596 self.raft.commit_apply(applied);
597 }
598
599 pub fn on_persist_ready(&mut self, number: u64) {
607 let (mut index, mut term) = (0, 0);
608 let mut snap_index = 0;
609 while let Some(record) = self.records.front() {
610 if record.number > number {
611 break;
612 }
613 let record = self.records.pop_front().unwrap();
614
615 if let Some((i, _)) = record.snapshot {
616 snap_index = i;
617 index = 0;
618 term = 0;
619 }
620
621 if let Some((i, t)) = record.last_entry {
622 index = i;
623 term = t;
624 }
625 }
626 if snap_index != 0 {
627 self.raft.on_persist_snap(snap_index);
628 }
629 if index != 0 {
630 self.raft.on_persist_entries(index, term);
631 }
632 }
633
634 pub fn advance(&mut self, rd: Ready) -> LightReady {
644 let applied = self.commit_since_index;
645 let light_rd = self.advance_append(rd);
646 self.advance_apply_to(applied);
647 light_rd
648 }
649
650 #[inline]
658 pub fn advance_append(&mut self, rd: Ready) -> LightReady {
659 self.commit_ready(rd);
660 self.on_persist_ready(self.max_number);
661 let mut light_rd = self.gen_light_ready();
662 if self.raft.state != StateRole::Leader && !light_rd.messages().is_empty() {
663 panic!("not leader but has new msg after advance");
664 }
665 let hard_state = self.raft.hard_state();
667 if hard_state.commit > self.prev_hs.commit {
668 light_rd.commit_index = Some(hard_state.commit);
669 self.prev_hs.commit = hard_state.commit;
670 } else {
671 assert_eq!(hard_state.commit, self.prev_hs.commit);
672 light_rd.commit_index = None;
673 }
674 assert_eq!(hard_state, self.prev_hs, "hard state != prev_hs");
675 light_rd
676 }
677
678 #[inline]
687 pub fn advance_append_async(&mut self, rd: Ready) {
688 self.commit_ready(rd);
689 }
690
691 #[inline]
693 pub fn advance_apply(&mut self) {
694 self.commit_apply(self.commit_since_index);
695 }
696
697 #[inline]
699 pub fn advance_apply_to(&mut self, applied: u64) {
700 self.commit_apply(applied);
701 }
702
703 #[inline]
705 pub fn snap(&self) -> Option<&Snapshot> {
706 self.raft.snap()
707 }
708
709 #[inline]
711 pub fn status(&self) -> Status {
712 Status::new(&self.raft)
713 }
714
715 pub fn report_unreachable(&mut self, id: u64) {
717 let mut m = Message::default();
718 m.set_msg_type(MessageType::MsgUnreachable);
719 m.from = id;
720 let _ = self.raft.step(m);
722 }
723
724 pub fn report_snapshot(&mut self, id: u64, status: SnapshotStatus) {
726 let rej = status == SnapshotStatus::Failure;
727 let mut m = Message::default();
728 m.set_msg_type(MessageType::MsgSnapStatus);
729 m.from = id;
730 m.reject = rej;
731 let _ = self.raft.step(m);
733 }
734
735 pub fn request_snapshot(&mut self) -> Result<()> {
739 self.raft.request_snapshot()
740 }
741
742 pub fn transfer_leader(&mut self, transferee: u64) {
744 let mut m = Message::default();
745 m.set_msg_type(MessageType::MsgTransferLeader);
746 m.from = transferee;
747 let _ = self.raft.step(m);
748 }
749
750 pub fn read_index(&mut self, rctx: Vec<u8>) {
755 let m = Message {
756 msg_type: MessageType::MsgReadIndex as i32,
757 entries: vec![Entry {
758 data: rctx,
759 ..Default::default()
760 }],
761 ..Default::default()
762 };
763 let _ = self.raft.step(m);
764 }
765
766 #[inline]
768 pub fn store(&self) -> &T {
769 self.raft.store()
770 }
771
772 #[inline]
774 pub fn mut_store(&mut self) -> &mut T {
775 self.raft.mut_store()
776 }
777
778 #[inline]
780 pub fn skip_bcast_commit(&mut self, skip: bool) {
781 self.raft.skip_bcast_commit(skip)
782 }
783
784 #[inline]
786 pub fn set_batch_append(&mut self, batch_append: bool) {
787 self.raft.set_batch_append(batch_append)
788 }
789}
790
791#[cfg(test)]
792mod test {
793 use super::is_local_msg;
794 use crate::eraftpb::MessageType;
795
796 #[test]
797 fn test_is_local_msg() {
798 let tests = vec![
799 (MessageType::MsgHup, true),
800 (MessageType::MsgBeat, true),
801 (MessageType::MsgUnreachable, true),
802 (MessageType::MsgSnapStatus, true),
803 (MessageType::MsgCheckQuorum, true),
804 (MessageType::MsgPropose, false),
805 (MessageType::MsgAppend, false),
806 (MessageType::MsgAppendResponse, false),
807 (MessageType::MsgRequestVote, false),
808 (MessageType::MsgRequestVoteResponse, false),
809 (MessageType::MsgSnapshot, false),
810 (MessageType::MsgHeartbeat, false),
811 (MessageType::MsgHeartbeatResponse, false),
812 (MessageType::MsgTransferLeader, false),
813 (MessageType::MsgTimeoutNow, false),
814 (MessageType::MsgReadIndex, false),
815 (MessageType::MsgReadIndexResp, false),
816 (MessageType::MsgRequestPreVote, false),
817 (MessageType::MsgRequestPreVoteResponse, false),
818 ];
819 for (msg_type, result) in tests {
820 assert_eq!(is_local_msg(msg_type), result);
821 }
822 }
823}