1use std::sync::Arc;
24use std::{collections::VecDeque, mem};
25
26use crate::logger::Logger;
27use protobuf::Message as PbMessage;
28use raft_proto::ConfChangeI;
29
30use crate::eraftpb::{ConfState, Entry, EntryType, HardState, Message, MessageType, Snapshot};
31use crate::errors::{Error, Result};
32use crate::read_only::ReadState;
33use crate::{config::Config, StateRole};
34use crate::{storage::GetEntriesFor, GetEntriesContext, Raft, SoftState, Status, Storage};
35
36#[derive(Debug, Default)]
38pub struct Peer {
39 pub id: u64,
41 pub context: Option<Vec<u8>>,
44}
45
46#[derive(Debug, PartialEq, Eq, Copy, Clone)]
48pub enum SnapshotStatus {
49 Finish,
51 Failure,
53}
54
55pub fn is_local_msg(t: MessageType) -> bool {
57 matches!(
58 t,
59 MessageType::MsgHup
60 | MessageType::MsgBeat
61 | MessageType::MsgUnreachable
62 | MessageType::MsgSnapStatus
63 | MessageType::MsgCheckQuorum
64 )
65}
66
67fn is_response_msg(t: MessageType) -> bool {
68 matches!(
69 t,
70 MessageType::MsgAppendResponse
71 | MessageType::MsgRequestVoteResponse
72 | MessageType::MsgHeartbeatResponse
73 | MessageType::MsgUnreachable
74 | MessageType::MsgRequestPreVoteResponse
75 )
76}
77
78#[deprecated(since = "0.6.0", note = "Please use `Snapshot::is_empty` instead")]
80pub fn is_empty_snap(s: &Snapshot) -> bool {
81 s.is_empty()
82}
83
84#[derive(Default, Debug, PartialEq)]
87pub struct Ready {
88 number: u64,
89
90 ss: Option<SoftState>,
91
92 hs: Option<HardState>,
93
94 read_states: Vec<ReadState>,
95
96 entries: Vec<Entry>,
97
98 snapshot: Snapshot,
99
100 is_persisted_msg: bool,
101
102 light: LightReady,
103
104 must_sync: bool,
105}
106
107impl Ready {
108 #[inline]
111 pub fn number(&self) -> u64 {
112 self.number
113 }
114
115 #[inline]
119 pub fn ss(&self) -> Option<&SoftState> {
120 self.ss.as_ref()
121 }
122
123 #[inline]
126 pub fn hs(&self) -> Option<&HardState> {
127 self.hs.as_ref()
128 }
129
130 #[inline]
132 pub fn read_states(&self) -> &Vec<ReadState> {
133 &self.read_states
134 }
135
136 #[inline]
138 pub fn take_read_states(&mut self) -> Vec<ReadState> {
139 mem::take(&mut self.read_states)
140 }
141
142 #[inline]
144 pub fn entries(&self) -> &Vec<Entry> {
145 &self.entries
146 }
147
148 #[inline]
150 pub fn take_entries(&mut self) -> Vec<Entry> {
151 mem::take(&mut self.entries)
152 }
153
154 #[inline]
156 pub fn snapshot(&self) -> &Snapshot {
157 &self.snapshot
158 }
159
160 #[inline]
164 pub fn committed_entries(&self) -> &Vec<Entry> {
165 self.light.committed_entries()
166 }
167
168 #[inline]
170 pub fn take_committed_entries(&mut self) -> Vec<Entry> {
171 self.light.take_committed_entries()
172 }
173
174 #[inline]
178 pub fn messages(&self) -> &[Message] {
179 if !self.is_persisted_msg {
180 self.light.messages()
181 } else {
182 &[]
183 }
184 }
185
186 #[inline]
188 pub fn take_messages(&mut self) -> Vec<Message> {
189 if !self.is_persisted_msg {
190 self.light.take_messages()
191 } else {
192 Vec::new()
193 }
194 }
195
196 #[inline]
199 pub fn persisted_messages(&self) -> &[Message] {
200 if self.is_persisted_msg {
201 self.light.messages()
202 } else {
203 &[]
204 }
205 }
206
207 #[inline]
209 pub fn take_persisted_messages(&mut self) -> Vec<Message> {
210 if self.is_persisted_msg {
211 self.light.take_messages()
212 } else {
213 Vec::new()
214 }
215 }
216
217 #[inline]
223 pub fn must_sync(&self) -> bool {
224 self.must_sync
225 }
226}
227
228#[derive(Default, Debug, PartialEq)]
230struct ReadyRecord {
231 number: u64,
232 last_entry: Option<(u64, u64)>,
234 snapshot: Option<(u64, u64)>,
236}
237
238#[derive(Default, Debug, PartialEq)]
241pub struct LightReady {
242 commit_index: Option<u64>,
243 committed_entries: Vec<Entry>,
244 messages: Vec<Message>,
245}
246
247impl LightReady {
248 #[inline]
252 pub fn commit_index(&self) -> Option<u64> {
253 self.commit_index
254 }
255
256 #[inline]
260 pub fn committed_entries(&self) -> &Vec<Entry> {
261 &self.committed_entries
262 }
263
264 #[inline]
266 pub fn take_committed_entries(&mut self) -> Vec<Entry> {
267 mem::take(&mut self.committed_entries)
268 }
269
270 #[inline]
272 pub fn messages(&self) -> &[Message] {
273 &self.messages
274 }
275
276 #[inline]
278 pub fn take_messages(&mut self) -> Vec<Message> {
279 mem::take(&mut self.messages)
280 }
281}
282
283pub struct RawNode<T: Storage> {
287 pub raft: Raft<T>,
289 prev_ss: SoftState,
290 prev_hs: HardState,
291 max_number: u64,
293 records: VecDeque<ReadyRecord>,
294 commit_since_index: u64,
296}
297
298impl<T: Storage> RawNode<T> {
299 #[allow(clippy::new_ret_no_self)]
300 pub fn new(config: &Config, store: T, logger: Arc<dyn Logger>) -> Result<Self> {
302 assert_ne!(config.id, 0, "config.id must not be zero");
303 let r = Raft::new(config, store, logger)?;
304 let mut rn = RawNode {
305 raft: r,
306 prev_hs: Default::default(),
307 prev_ss: Default::default(),
308 max_number: 0,
309 records: VecDeque::new(),
310 commit_since_index: config.applied,
311 };
312 rn.prev_hs = rn.raft.hard_state();
313 rn.prev_ss = rn.raft.soft_state();
314 rn.raft
315 .logger
316 .info(format!("RawNode created with id {}.", rn.raft.id).as_str());
317 Ok(rn)
318 }
319
320 #[cfg(feature = "default-logger")]
324 #[allow(clippy::new_ret_no_self)]
325 pub fn with_default_logger(c: &Config, store: T) -> Result<Self> {
326 use crate::logger::Slogger;
327
328 Self::new(
329 c,
330 store,
331 Arc::new(Slogger {
332 slog: crate::default_logger(),
333 }),
334 )
335 }
336
337 #[inline]
339 pub fn set_priority(&mut self, priority: i64) {
340 self.raft.set_priority(priority);
341 }
342
343 pub fn tick(&mut self) -> bool {
348 self.raft.tick()
349 }
350
351 pub fn campaign(&mut self) -> Result<()> {
353 let mut m = Message::default();
354 m.set_msg_type(MessageType::MsgHup);
355 self.raft.step(m)
356 }
357
358 pub fn propose(&mut self, context: Vec<u8>, data: Vec<u8>) -> Result<()> {
360 let mut m = Message::default();
361 m.set_msg_type(MessageType::MsgPropose);
362 m.from = self.raft.id;
363 let mut e = Entry::default();
364 e.data = data.into();
365 e.context = context.into();
366 m.set_entries(vec![e].into());
367 self.raft.step(m)
368 }
369
370 pub fn ping(&mut self) {
374 self.raft.ping()
375 }
376
377 pub fn propose_conf_change(&mut self, context: Vec<u8>, cc: impl ConfChangeI) -> Result<()> {
383 let (data, ty) = if let Some(cc) = cc.as_v1() {
384 (cc.write_to_bytes()?, EntryType::EntryConfChange)
385 } else {
386 (cc.as_v2().write_to_bytes()?, EntryType::EntryConfChangeV2)
387 };
388 let mut m = Message::default();
389 m.set_msg_type(MessageType::MsgPropose);
390 let mut e = Entry::default();
391 e.set_entry_type(ty);
392 e.data = data.into();
393 e.context = context.into();
394 m.set_entries(vec![e].into());
395 self.raft.step(m)
396 }
397
398 pub fn apply_conf_change(&mut self, cc: &impl ConfChangeI) -> Result<ConfState> {
402 self.raft.apply_conf_change(&cc.as_v2())
403 }
404
405 pub fn step(&mut self, m: Message) -> Result<()> {
407 if is_local_msg(m.get_msg_type()) {
409 return Err(Error::StepLocalMsg);
410 }
411 if self.raft.prs().get(m.from).is_some() || !is_response_msg(m.get_msg_type()) {
412 return self.raft.step(m);
413 }
414 Err(Error::StepPeerNotFound)
415 }
416
417 pub fn on_entries_fetched(&mut self, context: GetEntriesContext) {
425 match context.0 {
426 GetEntriesFor::SendAppend {
427 to,
428 term,
429 aggressively,
430 } => {
431 if self.raft.term != term || self.raft.state != StateRole::Leader {
432 return;
434 }
435 if self.raft.prs().get(to).is_none() {
436 return;
438 }
439
440 if aggressively {
441 self.raft.send_append_aggressively(to)
442 } else {
443 self.raft.send_append(to)
444 }
445 }
446 GetEntriesFor::Empty(can_async) if can_async => {}
447 _ => panic!("shouldn't call callback on non-async context"),
448 }
449 }
450
451 fn gen_light_ready(&mut self) -> LightReady {
453 let mut rd = LightReady::default();
454 let max_size = Some(self.raft.max_committed_size_per_ready);
455 let raft = &mut self.raft;
456 rd.committed_entries = raft
457 .raft_log
458 .next_entries_since(self.commit_since_index, max_size)
459 .unwrap_or_default();
460 raft.reduce_uncommitted_size(&rd.committed_entries);
462 if let Some(e) = rd.committed_entries.last() {
463 assert!(self.commit_since_index < e.get_index());
464 self.commit_since_index = e.get_index();
465 }
466
467 if !raft.msgs.is_empty() {
468 rd.messages = mem::take(&mut raft.msgs);
469 }
470
471 rd
472 }
473
474 pub fn ready(&mut self) -> Ready {
483 let raft = &mut self.raft;
484
485 self.max_number += 1;
486 let mut rd = Ready {
487 number: self.max_number,
488 ..Default::default()
489 };
490 let mut rd_record = ReadyRecord {
491 number: self.max_number,
492 ..Default::default()
493 };
494
495 if self.prev_ss.raft_state != StateRole::Leader && raft.state == StateRole::Leader {
496 for record in self.records.drain(..) {
500 assert_eq!(record.last_entry, None);
501 assert_eq!(record.snapshot, None);
502 }
503 }
504
505 let ss = raft.soft_state();
506 if ss != self.prev_ss {
507 rd.ss = Some(ss);
508 }
509 let hs = raft.hard_state();
510 if hs != self.prev_hs {
511 if hs.vote != self.prev_hs.vote || hs.term != self.prev_hs.term {
512 rd.must_sync = true;
513 }
514 rd.hs = Some(hs);
515 }
516
517 if !raft.read_states.is_empty() {
518 rd.read_states = mem::take(&mut raft.read_states);
519 }
520
521 if let Some(snapshot) = &raft.raft_log.unstable_snapshot() {
522 rd.snapshot = snapshot.clone();
523 assert!(self.commit_since_index <= rd.snapshot.get_metadata().index);
524 self.commit_since_index = rd.snapshot.get_metadata().index;
525 assert!(
528 !raft
529 .raft_log
530 .has_next_entries_since(self.commit_since_index),
531 "has snapshot but also has committed entries since {}",
532 self.commit_since_index
533 );
534 rd_record.snapshot = Some((
535 rd.snapshot.get_metadata().index,
536 rd.snapshot.get_metadata().term,
537 ));
538 rd.must_sync = true;
539 }
540
541 rd.entries = raft.raft_log.unstable_entries().to_vec();
542 if let Some(e) = rd.entries.last() {
543 rd.must_sync = true;
545 rd_record.last_entry = Some((e.get_index(), e.get_term()));
546 }
547
548 rd.is_persisted_msg = raft.state != StateRole::Leader;
551 rd.light = self.gen_light_ready();
552 self.records.push_back(rd_record);
553 rd
554 }
555
556 pub fn has_ready(&self) -> bool {
558 let raft = &self.raft;
559 if !raft.msgs.is_empty() {
560 return true;
561 }
562
563 if raft.soft_state() != self.prev_ss {
564 return true;
565 }
566 if raft.hard_state() != self.prev_hs {
567 return true;
568 }
569
570 if !raft.read_states.is_empty() {
571 return true;
572 }
573
574 if !raft.raft_log.unstable_entries().is_empty() {
575 return true;
576 }
577
578 if self.snap().map_or(false, |s| !s.is_empty()) {
579 return true;
580 }
581
582 if raft
583 .raft_log
584 .has_next_entries_since(self.commit_since_index)
585 {
586 return true;
587 }
588
589 false
590 }
591
592 fn commit_ready(&mut self, rd: Ready) {
593 if let Some(ss) = rd.ss {
594 self.prev_ss = ss;
595 }
596 if let Some(hs) = rd.hs {
597 self.prev_hs = hs;
598 }
599 let rd_record = self.records.back().unwrap();
600 assert!(rd_record.number == rd.number);
601 let raft = &mut self.raft;
602 if let Some((index, _)) = rd_record.snapshot {
603 raft.raft_log.stable_snap(index);
604 }
605 if let Some((index, term)) = rd_record.last_entry {
606 raft.raft_log.stable_entries(index, term);
607 }
608 }
609
610 fn commit_apply(&mut self, applied: u64) {
611 self.raft.commit_apply(applied);
612 }
613
614 pub fn on_persist_ready(&mut self, number: u64) {
622 let (mut index, mut term) = (0, 0);
623 let mut snap_index = 0;
624 while let Some(record) = self.records.front() {
625 if record.number > number {
626 break;
627 }
628 let record = self.records.pop_front().unwrap();
629
630 if let Some((i, _)) = record.snapshot {
631 snap_index = i;
632 index = 0;
633 term = 0;
634 }
635
636 if let Some((i, t)) = record.last_entry {
637 index = i;
638 term = t;
639 }
640 }
641 if snap_index != 0 {
642 self.raft.on_persist_snap(snap_index);
643 }
644 if index != 0 {
645 self.raft.on_persist_entries(index, term);
646 }
647 }
648
649 pub fn advance(&mut self, rd: Ready) -> LightReady {
659 let applied = self.commit_since_index;
660 let light_rd = self.advance_append(rd);
661 self.advance_apply_to(applied);
662 light_rd
663 }
664
665 #[inline]
673 pub fn advance_append(&mut self, rd: Ready) -> LightReady {
674 self.commit_ready(rd);
675 self.on_persist_ready(self.max_number);
676 let mut light_rd = self.gen_light_ready();
677 if self.raft.state != StateRole::Leader && !light_rd.messages().is_empty() {
678 self.raft
679 .logger
680 .fatal("not leader but has new msg after advance");
681 }
682 let hard_state = self.raft.hard_state();
684 if hard_state.commit > self.prev_hs.commit {
685 light_rd.commit_index = Some(hard_state.commit);
686 self.prev_hs.commit = hard_state.commit;
687 } else {
688 assert!(hard_state.commit == self.prev_hs.commit);
689 light_rd.commit_index = None;
690 }
691 assert_eq!(hard_state, self.prev_hs, "hard state != prev_hs");
692 light_rd
693 }
694
695 #[inline]
703 pub fn advance_append_async(&mut self, rd: Ready) {
704 self.commit_ready(rd);
705 }
706
707 #[inline]
709 pub fn advance_apply(&mut self) {
710 self.commit_apply(self.commit_since_index);
711 }
712
713 #[inline]
715 pub fn advance_apply_to(&mut self, applied: u64) {
716 self.commit_apply(applied);
717 }
718
719 #[inline]
721 pub fn snap(&self) -> Option<&Snapshot> {
722 self.raft.snap()
723 }
724
725 #[inline]
727 pub fn status(&self) -> Status {
728 Status::new(&self.raft)
729 }
730
731 pub fn report_unreachable(&mut self, id: u64) {
733 let mut m = Message::default();
734 m.set_msg_type(MessageType::MsgUnreachable);
735 m.from = id;
736 let _ = self.raft.step(m);
738 }
739
740 pub fn report_snapshot(&mut self, id: u64, status: SnapshotStatus) {
742 let rej = status == SnapshotStatus::Failure;
743 let mut m = Message::default();
744 m.set_msg_type(MessageType::MsgSnapStatus);
745 m.from = id;
746 m.reject = rej;
747 let _ = self.raft.step(m);
749 }
750
751 pub fn request_snapshot(&mut self) -> Result<()> {
755 self.raft.request_snapshot()
756 }
757
758 pub fn transfer_leader(&mut self, transferee: u64) {
760 let mut m = Message::default();
761 m.set_msg_type(MessageType::MsgTransferLeader);
762 m.from = transferee;
763 let _ = self.raft.step(m);
764 }
765
766 pub fn read_index(&mut self, rctx: Vec<u8>) {
771 let mut m = Message::default();
772 m.set_msg_type(MessageType::MsgReadIndex);
773 let mut e = Entry::default();
774 e.data = rctx.into();
775 m.set_entries(vec![e].into());
776 let _ = self.raft.step(m);
777 }
778
779 #[inline]
781 pub fn store(&self) -> &T {
782 self.raft.store()
783 }
784
785 #[inline]
787 pub fn mut_store(&mut self) -> &mut T {
788 self.raft.mut_store()
789 }
790
791 #[inline]
793 pub fn skip_bcast_commit(&mut self, skip: bool) {
794 self.raft.skip_bcast_commit(skip)
795 }
796
797 #[inline]
799 pub fn set_batch_append(&mut self, batch_append: bool) {
800 self.raft.set_batch_append(batch_append)
801 }
802}
803
804#[cfg(test)]
805mod test {
806 use crate::eraftpb::MessageType;
807
808 use super::is_local_msg;
809
810 #[test]
811 fn test_is_local_msg() {
812 let tests = vec![
813 (MessageType::MsgHup, true),
814 (MessageType::MsgBeat, true),
815 (MessageType::MsgUnreachable, true),
816 (MessageType::MsgSnapStatus, true),
817 (MessageType::MsgCheckQuorum, true),
818 (MessageType::MsgPropose, false),
819 (MessageType::MsgAppend, false),
820 (MessageType::MsgAppendResponse, false),
821 (MessageType::MsgRequestVote, false),
822 (MessageType::MsgRequestVoteResponse, false),
823 (MessageType::MsgSnapshot, false),
824 (MessageType::MsgHeartbeat, false),
825 (MessageType::MsgHeartbeatResponse, false),
826 (MessageType::MsgTransferLeader, false),
827 (MessageType::MsgTimeoutNow, false),
828 (MessageType::MsgReadIndex, false),
829 (MessageType::MsgReadIndexResp, false),
830 (MessageType::MsgRequestPreVote, false),
831 (MessageType::MsgRequestPreVoteResponse, false),
832 ];
833 for (msg_type, result) in tests {
834 assert_eq!(is_local_msg(msg_type), result);
835 }
836 }
837}