1#![deny(unused_must_use)]
2#![cfg_attr(docsrs, feature(doc_cfg))]
3
4use anyhow::anyhow;
15use anyhow::Result;
16use async_trait::async_trait;
17use bytes::Bytes;
18use derive_builder::Builder;
19use derive_more::{Display, FromStr};
20use futures::stream::StreamExt;
21use std::collections::{BTreeMap, HashSet};
22use std::sync::atomic::{AtomicU64, Ordering};
23use std::sync::Arc;
24use std::time::{Duration, Instant};
25use tokio::sync::{Mutex, Notify, RwLock, Semaphore};
26
27mod ack;
28mod core_message;
29#[cfg(feature = "gateway")]
30#[cfg_attr(docsrs, doc(cfg(feature = "gateway")))]
31pub mod gateway;
33mod membership;
34mod proto;
35mod query_queue;
36mod quorum_join;
37mod server;
38#[cfg(feature = "simple")]
39#[cfg_attr(docsrs, doc(cfg(feature = "simple")))]
40pub mod simple;
42mod snapshot;
43pub mod storage;
45mod thread;
46mod thread_drop;
47
48use ack::Ack;
49use storage::RaftStorage;
50use tonic::transport::Endpoint;
51
52mod proto_compiled {
53 pub use crate::proto::lol_core::*;
54}
55
56pub mod api {
58 pub use crate::proto_compiled::{
59 AddServerRep, AddServerReq, ApplyRep, ApplyReq, ClusterInfoRep, ClusterInfoReq, CommitRep,
60 CommitReq, GetConfigRep, GetConfigReq, RemoveServerRep, RemoveServerReq, StatusRep,
61 StatusReq, TimeoutNowRep, TimeoutNowReq, TuneConfigRep, TuneConfigReq,
62 };
63}
64
65pub use crate::proto_compiled::raft_client::RaftClient;
66
67use storage::{Ballot, Entry};
68
69pub enum MakeSnapshot {
71 None,
73 CopySnapshot,
76 FoldSnapshot,
79}
80
81#[async_trait]
90pub trait RaftApp: Sync + Send + 'static {
91 async fn process_read(&self, request: &[u8]) -> Result<Vec<u8>>;
94
95 async fn process_write(
102 &self,
103 request: &[u8],
104 entry_index: Index,
105 ) -> Result<(Vec<u8>, MakeSnapshot)>;
106
107 async fn install_snapshot(&self, snapshot: Option<Index>) -> Result<()>;
110
111 async fn fold_snapshot(
114 &self,
115 old_snapshot: Option<Index>,
116 requests: Vec<&[u8]>,
117 snapshot_index: Index,
118 ) -> Result<()>;
119
120 async fn save_snapshot(&self, st: SnapshotStream, snapshot_index: Index) -> Result<()>;
122
123 async fn open_snapshot(&self, x: Index) -> Result<SnapshotStream>;
125
126 async fn delete_snapshot(&self, x: Index) -> Result<()>;
128}
129
130pub type SnapshotStream =
133 std::pin::Pin<Box<dyn futures::stream::Stream<Item = anyhow::Result<Bytes>> + Send>>;
134
135type Term = u64;
136pub type Index = u64;
138#[derive(Clone, Copy, Eq)]
139struct Clock {
140 term: Term,
141 index: Index,
142}
143impl PartialEq for Clock {
144 fn eq(&self, that: &Self) -> bool {
145 self.term == that.term && self.index == that.index
146 }
147}
148
149pub use tonic::transport::Uri;
150
151#[derive(
152 serde::Serialize, serde::Deserialize, Clone, PartialEq, Eq, Hash, Debug, Display, FromStr,
153)]
154struct Id(#[serde(with = "http_serde::uri")] Uri);
155impl Id {
156 fn uri(&self) -> &Uri {
157 &self.0
158 }
159}
160impl From<Uri> for Id {
161 fn from(x: Uri) -> Id {
162 Id(x)
163 }
164}
165
166#[cfg(test)]
167mod tests {
168 use super::*;
169
170 #[test]
171 fn test_uri() {
172 let a = "https://192.168.1.1:8000";
173 let aa: Uri = a.parse().unwrap();
174 let b = "https://192.168.1.1:8000/";
175 let bb: Uri = b.parse().unwrap();
176 assert_eq!(aa, bb);
178 assert_eq!(aa.to_string(), b);
180 assert_ne!(a, b);
181 }
182
183 #[test]
184 fn test_id() {
185 let a = "https://192.168.100.200:8080";
186 let b: Id = a.parse().unwrap();
187 let c = a.to_string();
188 assert_eq!(a, c);
189 let d = c.parse().unwrap();
190 assert_eq!(b, d);
191 }
192
193 #[test]
194 fn test_id_serde() {
195 let a: Id = "http://192.168.1.1:8080".parse().unwrap();
196 let b = bincode::serialize(&a).unwrap();
197 let c = bincode::deserialize(&b).unwrap();
198 assert_eq!(a, c);
199 }
200}
201
202#[derive(serde::Serialize, serde::Deserialize)]
203enum Command<'a> {
204 Noop,
205 Snapshot {
206 membership: HashSet<Id>,
207 },
208 ClusterConfiguration {
209 membership: HashSet<Id>,
210 },
211 Req {
212 core: bool,
213 #[serde(with = "serde_bytes")]
214 message: &'a [u8],
215 },
216}
217impl<'a> Command<'a> {
218 fn serialize(x: &Command) -> Bytes {
219 bincode::serialize(x).unwrap().into()
220 }
221 fn deserialize(x: &[u8]) -> Command {
222 bincode::deserialize(x).unwrap()
223 }
224}
225#[derive(Clone, Copy)]
226enum ElectionState {
227 Leader,
228 Candidate,
229 Follower,
230}
231
232#[derive(Builder)]
234pub struct Config {
235 #[builder(default = "300")]
236 compaction_interval_sec: u64,
241}
242impl Config {
243 fn compaction_interval(&self) -> Duration {
244 Duration::from_secs(self.compaction_interval_sec)
245 }
246 fn snapshot_insertion_delay(&self) -> Duration {
247 let minv = Duration::from_secs(10);
250 let v = Duration::from_secs(self.compaction_interval_sec / 2);
251 std::cmp::max(minv, v)
252 }
253}
254
255struct FailureDetector {
256 watch_id: Id,
257 detector: phi_detector::PingWindow,
258}
259impl FailureDetector {
260 fn watch(id: Id) -> Self {
261 Self {
262 watch_id: id,
263 detector: phi_detector::PingWindow::new(&[Duration::from_secs(1)], Instant::now()),
264 }
265 }
266}
267
268struct RaftCore {
272 id: Id,
273 app: Box<dyn RaftApp>,
274 query_queue: Mutex<query_queue::QueryQueue>,
275 log: Log,
276 election_state: RwLock<ElectionState>,
277 cluster: RwLock<membership::Cluster>,
278 config: RwLock<Config>,
279 vote_token: Semaphore,
280 safe_term: AtomicU64,
283 membership_barrier: AtomicU64,
285
286 failure_detector: RwLock<FailureDetector>,
287}
288impl RaftCore {
289 async fn new(
290 app: impl RaftApp,
291 storage: impl RaftStorage,
292 id: Uri,
293 config: Config,
294 ) -> Arc<Self> {
295 let id: Id = id.into();
296 let init_cluster = membership::Cluster::empty(id.clone()).await;
297 let (membership_index, init_membership) =
298 Self::find_last_membership(&storage).await.unwrap();
299 let init_log = Log::new(storage).await;
300 let fd = FailureDetector::watch(id.clone());
301 let r = Arc::new(Self {
302 app: Box::new(app),
303 query_queue: Mutex::new(query_queue::QueryQueue::new()),
304 id,
305 log: init_log,
306 election_state: RwLock::new(ElectionState::Follower),
307 cluster: RwLock::new(init_cluster),
308 config: RwLock::new(config),
309 vote_token: Semaphore::new(1),
310 safe_term: 0.into(),
311 membership_barrier: 0.into(),
312 failure_detector: RwLock::new(fd),
313 });
314 log::info!(
315 "initial membership is {:?} at {}",
316 init_membership,
317 membership_index
318 );
319 r.set_membership(&init_membership, membership_index)
320 .await
321 .unwrap();
322 r
323 }
324 async fn find_last_membership<S: RaftStorage>(storage: &S) -> Result<(Index, HashSet<Id>)> {
325 let last = storage.get_last_index().await?;
326 let mut ret = (0, HashSet::new());
327 for i in (1..=last).rev() {
328 let e = storage.get_entry(i).await?.unwrap();
329 match Command::deserialize(&e.command) {
330 Command::Snapshot { membership } => {
331 ret = (i, membership);
332 break;
333 }
334 Command::ClusterConfiguration { membership } => {
335 ret = (i, membership);
336 break;
337 }
338 _ => {}
339 }
340 }
341 Ok(ret)
342 }
343 async fn init_cluster(self: &Arc<Self>) -> Result<()> {
344 let snapshot = Entry {
345 prev_clock: Clock { term: 0, index: 0 },
346 this_clock: Clock { term: 0, index: 1 },
347 command: Command::serialize(&Command::Snapshot {
348 membership: HashSet::new(),
349 }),
350 };
351 self.log.insert_snapshot(snapshot).await?;
352
353 let mut membership = HashSet::new();
354 membership.insert(self.id.clone());
355 let add_server = Entry {
356 prev_clock: Clock { term: 0, index: 1 },
357 this_clock: Clock { term: 0, index: 2 },
358 command: Command::serialize(&Command::ClusterConfiguration {
359 membership: membership.clone(),
360 }),
361 };
362 self.log.insert_entry(add_server).await?;
363 self.set_membership(&membership, 2).await?;
364
365 self.send_timeout_now(self.id.clone());
369
370 Ok(())
371 }
372 fn allow_new_membership_change(&self) -> bool {
373 self.log.commit_index.load(Ordering::SeqCst)
374 >= self.membership_barrier.load(Ordering::SeqCst)
375 }
376 async fn set_membership(
377 self: &Arc<Self>,
378 membership: &HashSet<Id>,
379 index: Index,
380 ) -> Result<()> {
381 log::info!("change membership to {:?}", membership);
382 self.cluster
383 .write()
384 .await
385 .set_membership(&membership, Arc::clone(&self))
386 .await?;
387 self.membership_barrier.store(index, Ordering::SeqCst);
388 Ok(())
389 }
390 async fn register_query(self: &Arc<Self>, core: bool, message: Bytes, ack: Ack) {
391 let query = query_queue::Query { core, message, ack };
392 self.query_queue
393 .lock()
394 .await
395 .register(self.log.commit_index.load(Ordering::SeqCst), query);
396 self.query_queue
400 .lock()
401 .await
402 .execute(
403 self.log.last_applied.load(Ordering::SeqCst),
404 Arc::clone(&self),
405 )
406 .await;
407 }
408}
409enum TryInsertResult {
410 Inserted,
411 Skipped,
412 Rejected,
413}
414struct LogStream {
415 sender_id: Id,
416 prev_log_term: Term,
417 prev_log_index: Index,
418 entries: std::pin::Pin<Box<dyn futures::stream::Stream<Item = LogStreamElem> + Send>>,
419}
420struct LogStreamElem {
421 term: Term,
422 index: Index,
423 command: Bytes,
424}
425
426fn into_out_stream(
427 x: LogStream,
428) -> impl futures::stream::Stream<Item = crate::proto_compiled::AppendEntryReq> {
429 use crate::proto_compiled::{append_entry_req::Elem, AppendStreamEntry, AppendStreamHeader};
430 let header_stream = vec![Elem::Header(AppendStreamHeader {
431 sender_id: x.sender_id.to_string(),
432 prev_log_index: x.prev_log_index,
433 prev_log_term: x.prev_log_term,
434 })];
435 let header_stream = futures::stream::iter(header_stream);
436 let chunk_stream = x.entries.map(|e| {
437 Elem::Entry(AppendStreamEntry {
438 term: e.term,
439 index: e.index,
440 command: e.command,
441 })
442 });
443 header_stream
444 .chain(chunk_stream)
445 .map(|e| crate::proto_compiled::AppendEntryReq { elem: Some(e) })
446}
447impl RaftCore {
449 async fn change_membership(self: &Arc<Self>, command: Bytes, index: Index) -> Result<()> {
450 match Command::deserialize(&command) {
451 Command::Snapshot { membership } => {
452 self.set_membership(&membership, index).await?;
453 }
454 Command::ClusterConfiguration { membership } => {
455 self.set_membership(&membership, index).await?;
456 }
457 _ => {}
458 }
459 Ok(())
460 }
461 fn commit_safe_term(&self, term: Term) {
462 log::info!("noop entry for term {} is successfully committed", term);
463 self.safe_term.fetch_max(term, Ordering::SeqCst);
464 }
465 async fn queue_entry(self: &Arc<Self>, command: Bytes, ack: Option<Ack>) -> Result<()> {
467 let term = self.load_ballot().await?.cur_term;
468 let cur_safe_term = self.safe_term.load(Ordering::SeqCst);
470 if cur_safe_term < term {
471 return Err(anyhow!(
472 "noop entry for term {} isn't committed yet. (> {})",
473 term,
474 cur_safe_term
475 ));
476 }
477 let append_index = self
479 .log
480 .append_new_entry(command.clone(), ack, term)
481 .await?;
482 self.log.replication_notify.notify_one();
483
484 self.change_membership(command, append_index).await?;
486 Ok(())
487 }
488 async fn queue_received_entry(self: &Arc<Self>, mut req: LogStream) -> Result<bool> {
490 let mut prev_clock = Clock {
491 term: req.prev_log_term,
492 index: req.prev_log_index,
493 };
494 while let Some(e) = req.entries.next().await {
495 let entry = Entry {
496 prev_clock,
497 this_clock: Clock {
498 term: e.term,
499 index: e.index,
500 },
501 command: e.command,
502 };
503 let insert_index = entry.this_clock.index;
504 let command = entry.command.clone();
505 match self
506 .log
507 .try_insert_entry(entry, req.sender_id.clone(), Arc::clone(&self))
508 .await?
509 {
510 TryInsertResult::Inserted => {
511 self.change_membership(command, insert_index).await?;
512 }
513 TryInsertResult::Skipped => {}
514 TryInsertResult::Rejected => {
515 log::warn!("rejected append entry (clock={:?})", (e.term, e.index));
516 return Ok(false);
517 }
518 }
519 prev_clock = Clock {
520 term: e.term,
521 index: e.index,
522 };
523 }
524 Ok(true)
525 }
526 async fn prepare_replication_stream(self: &Arc<Self>, l: Index, r: Index) -> Result<LogStream> {
527 let head = self.log.storage.get_entry(l).await?.unwrap();
528 let Clock {
529 term: prev_log_term,
530 index: prev_log_index,
531 } = head.prev_clock;
532 let Clock { term, index } = head.this_clock;
533 let e = LogStreamElem {
534 term,
535 index,
536 command: head.command,
537 };
538 let st1 = futures::stream::iter(vec![e]);
539 let core = Arc::clone(&self);
540 let st2 = async_stream::stream! {
541 for idx in l + 1..r {
542 let x = core.log.storage.get_entry(idx).await.unwrap().unwrap();
543 let Clock { term, index } = x.this_clock;
544 let e = LogStreamElem {
545 term,
546 index,
547 command: x.command,
548 };
549 yield e;
550 }
551 };
552 let st = st1.chain(st2);
553 Ok(LogStream {
554 sender_id: self.id.clone(),
555 prev_log_term,
556 prev_log_index,
557 entries: Box::pin(st),
558 })
559 }
560 async fn advance_replication(self: &Arc<Self>, follower_id: Id) -> Result<bool> {
561 let peer = self
562 .cluster
563 .read()
564 .await
565 .peers
566 .get(&follower_id)
567 .unwrap()
568 .clone();
569
570 let old_progress = peer.progress;
571 let cur_last_log_index = self.log.get_last_log_index().await?;
572
573 let should_send = cur_last_log_index >= old_progress.next_index;
575 if !should_send {
576 return Ok(false);
577 }
578
579 let cur_snapshot_index = self.log.get_snapshot_index();
582 if old_progress.next_index < cur_snapshot_index {
583 log::warn!(
584 "entry not found at next_index (idx={}) for {}",
585 old_progress.next_index,
586 follower_id.uri(),
587 );
588 let new_progress = membership::ReplicationProgress::new(cur_snapshot_index);
589 let mut cluster = self.cluster.write().await;
590 cluster.peers.get_mut(&follower_id).unwrap().progress = new_progress;
591 return Ok(true);
592 }
593
594 let n_max_possible = cur_last_log_index - old_progress.next_index + 1;
595 let n = std::cmp::min(old_progress.next_max_cnt, n_max_possible);
596 assert!(n >= 1);
597
598 let in_stream = self
599 .prepare_replication_stream(old_progress.next_index, old_progress.next_index + n)
600 .await?;
601
602 let res: Result<_> = async {
603 let endpoint =
604 Endpoint::from(follower_id.uri().clone()).connect_timeout(Duration::from_secs(5));
605 let mut conn = RaftClient::connect(endpoint).await?;
606 let out_stream = into_out_stream(in_stream);
607 let res = conn.send_append_entry(out_stream).await?;
608 Ok(res)
609 }
610 .await;
611
612 let mut incremented = false;
613 let new_progress = if res.is_ok() {
614 let res = res.unwrap();
615 match res.into_inner() {
616 crate::proto_compiled::AppendEntryRep { success: true, .. } => {
617 incremented = true;
618 membership::ReplicationProgress {
619 match_index: old_progress.next_index + n - 1,
620 next_index: old_progress.next_index + n,
621 next_max_cnt: n * 2,
622 }
623 }
624 crate::proto_compiled::AppendEntryRep {
625 success: false,
626 last_log_index,
627 } => membership::ReplicationProgress {
628 match_index: old_progress.match_index,
629 next_index: std::cmp::min(old_progress.next_index - 1, last_log_index + 1),
630 next_max_cnt: 1,
631 },
632 }
633 } else {
634 old_progress
635 };
636
637 {
638 let mut cluster = self.cluster.write().await;
639 cluster.peers.get_mut(&follower_id).unwrap().progress = new_progress;
640 }
641 if incremented {
642 self.log.replication_notify.notify_one();
643 }
644
645 Ok(true)
646 }
647 async fn find_new_agreement(&self) -> Result<Index> {
648 let mut match_indices = vec![];
649
650 let last_log_index = self.log.get_last_log_index().await?;
653 match_indices.push(last_log_index);
654
655 for (_, peer) in self.cluster.read().await.peers.clone() {
656 match_indices.push(peer.progress.match_index);
657 }
658
659 match_indices.sort();
660 match_indices.reverse();
661 let mid = match_indices.len() / 2;
662 let new_agreement = match_indices[mid];
663 Ok(new_agreement)
664 }
665}
666impl RaftCore {
668 async fn fetch_snapshot(&self, snapshot_index: Index, to: Id) -> Result<()> {
669 let endpoint = Endpoint::from(to.uri().clone()).connect_timeout(Duration::from_secs(5));
670
671 let mut conn = RaftClient::connect(endpoint).await?;
672 let req = proto_compiled::GetSnapshotReq {
673 index: snapshot_index,
674 };
675 let res = conn.get_snapshot(req).await?;
676 let out_stream = res.into_inner();
677 let in_stream = Box::pin(snapshot::into_in_stream(out_stream));
678 self.app.save_snapshot(in_stream, snapshot_index).await?;
679 Ok(())
680 }
681 async fn make_snapshot_stream(&self, snapshot_index: Index) -> Result<Option<SnapshotStream>> {
682 let st = self.app.open_snapshot(snapshot_index).await?;
683 Ok(Some(st))
684 }
685}
686impl RaftCore {
688 async fn save_ballot(&self, v: Ballot) -> Result<()> {
689 self.log.storage.save_ballot(v).await
690 }
691 async fn load_ballot(&self) -> Result<Ballot> {
692 self.log.storage.load_ballot().await
693 }
694 async fn detect_election_timeout(&self) -> bool {
695 let fd = &self.failure_detector.read().await.detector;
696 let normal_dist = fd.normal_dist();
697 let phi = normal_dist.phi(Instant::now() - fd.last_ping());
698 phi > 3.
699 }
700 async fn receive_vote(
701 &self,
702 candidate_term: Term,
703 candidate_id: Id,
704 candidate_last_log_clock: Clock,
705 force_vote: bool,
706 pre_vote: bool,
707 ) -> Result<bool> {
708 let allow_side_effects = !pre_vote;
709
710 if !force_vote {
711 if !self.detect_election_timeout().await {
712 return Ok(false);
713 }
714 }
715
716 let _vote_guard = self.vote_token.acquire().await;
717
718 let mut ballot = self.load_ballot().await?;
719 if candidate_term < ballot.cur_term {
720 log::warn!("candidate term is older. reject vote");
721 return Ok(false);
722 }
723
724 if candidate_term > ballot.cur_term {
725 log::warn!("received newer term. reset vote");
726 ballot.cur_term = candidate_term;
727 ballot.voted_for = None;
728 if allow_side_effects {
729 *self.election_state.write().await = ElectionState::Follower;
730 }
731 }
732
733 let cur_last_index = self.log.get_last_log_index().await?;
734
735 let this_last_log_clock = self
741 .log
742 .storage
743 .get_entry(cur_last_index)
744 .await?
745 .map(|x| x.this_clock)
746 .unwrap_or(Clock { term: 0, index: 0 });
747
748 let candidate_win = match candidate_last_log_clock.term.cmp(&this_last_log_clock.term) {
749 std::cmp::Ordering::Greater => true,
750 std::cmp::Ordering::Equal => {
751 candidate_last_log_clock.index >= this_last_log_clock.index
752 }
753 std::cmp::Ordering::Less => false,
754 };
755
756 if !candidate_win {
757 log::warn!("candidate clock is older. reject vote");
758 if allow_side_effects {
759 self.save_ballot(ballot).await?;
760 }
761 return Ok(false);
762 }
763
764 let grant = match &ballot.voted_for {
765 None => {
766 ballot.voted_for = Some(candidate_id.clone());
767 true
768 }
769 Some(id) => {
770 if id == &candidate_id {
771 true
772 } else {
773 false
774 }
775 }
776 };
777
778 if allow_side_effects {
779 self.save_ballot(ballot).await?;
780 }
781 log::info!(
782 "voted response to {} = grant: {}",
783 candidate_id.uri(),
784 grant
785 );
786 Ok(grant)
787 }
788 async fn request_votes(
789 self: &Arc<Self>,
790 aim_term: Term,
791 force_vote: bool,
792 pre_vote: bool,
793 ) -> Result<bool> {
794 let (others, remaining) = {
795 let membership = self.cluster.read().await.membership.clone();
796 let n = membership.len();
797 let majority = (n / 2) + 1;
798 let include_self = membership.contains(&self.id);
799 let mut others = vec![];
800 for id in membership {
801 if id != self.id {
802 others.push(id);
803 }
804 }
805 let m = if include_self { majority - 1 } else { majority };
807 (others, m)
808 };
809
810 let last_log_index = self.log.get_last_log_index().await?;
811 let last_log_clock = self
812 .log
813 .storage
814 .get_entry(last_log_index)
815 .await?
816 .unwrap()
817 .this_clock;
818
819 let mut vote_requests = vec![];
821 for endpoint in others {
822 let myid = self.id.clone();
823 vote_requests.push(async move {
824 let Clock {
825 term: last_log_term,
826 index: last_log_index,
827 } = last_log_clock;
828 let req = crate::proto_compiled::RequestVoteReq {
829 term: aim_term,
830 candidate_id: myid.to_string(),
831 last_log_term,
832 last_log_index,
833 force_vote,
838 pre_vote,
841 };
842 let res: Result<_> = async {
843 let endpoint =
844 Endpoint::from(endpoint.uri().clone()).timeout(Duration::from_secs(5));
845 let mut conn = RaftClient::connect(endpoint).await?;
846 let res = conn.request_vote(req).await?;
847 Ok(res)
848 }
849 .await;
850 match res {
851 Ok(res) => res.into_inner().vote_granted,
852 Err(_) => false,
853 }
854 });
855 }
856 let ok = quorum_join::quorum_join(remaining, vote_requests).await;
857 Ok(ok)
858 }
859 async fn after_votes(self: &Arc<Self>, aim_term: Term, ok: bool) -> Result<()> {
860 if ok {
861 log::info!("got enough votes from the cluster. promoted to leader");
862
863 let index = self
865 .log
866 .append_new_entry(Command::serialize(&Command::Noop), None, aim_term)
867 .await?;
868 self.membership_barrier.store(index, Ordering::SeqCst);
869
870 {
872 let initial_progress =
873 membership::ReplicationProgress::new(self.log.get_last_log_index().await?);
874 let mut cluster = self.cluster.write().await;
875 for (_, peer) in &mut cluster.peers {
876 peer.progress = initial_progress.clone();
877 }
878 }
879
880 *self.election_state.write().await = ElectionState::Leader;
881 } else {
882 log::info!("failed to become leader. now back to follower");
883 *self.election_state.write().await = ElectionState::Follower;
884 }
885 Ok(())
886 }
887 async fn try_promote(self: &Arc<Self>, force_vote: bool) -> Result<()> {
888 let pre_aim_term = {
890 let _ballot_guard = self.vote_token.acquire().await;
891 let ballot = self.load_ballot().await?;
892 ballot.cur_term + 1
893 };
894
895 log::info!("start pre-vote. try promote at term {}", pre_aim_term);
896 let ok = self
897 .request_votes(pre_aim_term, force_vote, true)
898 .await
899 .unwrap_or(false);
900 if !ok {
902 log::info!("pre-vote failed for term {}", pre_aim_term);
903 return Ok(());
904 }
905
906 let aim_term = {
908 let ballot_guard = self.vote_token.acquire().await;
909 let mut new_ballot = self.load_ballot().await?;
910 let aim_term = new_ballot.cur_term + 1;
911
912 if aim_term != pre_aim_term {
915 return Ok(());
916 }
917
918 new_ballot.cur_term = aim_term;
919 new_ballot.voted_for = Some(self.id.clone());
920
921 self.save_ballot(new_ballot).await?;
922 drop(ballot_guard);
923
924 *self.election_state.write().await = ElectionState::Candidate;
926 aim_term
927 };
928
929 log::info!("start election. try promote at term {}", aim_term);
930
931 let ok = self
934 .request_votes(aim_term, force_vote, false)
935 .await
936 .unwrap_or(false);
937 self.after_votes(aim_term, ok).await?;
938
939 Ok(())
940 }
941 async fn send_heartbeat(&self, follower_id: Id) -> Result<()> {
942 let endpoint = Endpoint::from(follower_id.uri().clone()).timeout(Duration::from_secs(5));
943 let req = {
944 let term = self.load_ballot().await?.cur_term;
945 proto_compiled::HeartbeatReq {
946 term,
947 leader_id: self.id.to_string(),
948 leader_commit: self.log.commit_index.load(Ordering::SeqCst),
949 }
950 };
951 if let Ok(mut conn) = RaftClient::connect(endpoint).await {
952 let res = conn.send_heartbeat(req).await;
953 if res.is_err() {
954 log::warn!("heartbeat to {} failed", follower_id.uri());
955 }
956 }
957 Ok(())
958 }
959 async fn record_heartbeat(&self) {
960 self.failure_detector
961 .write()
962 .await
963 .detector
964 .add_ping(Instant::now())
965 }
966 async fn reset_failure_detector(&self, leader_id: Id) {
967 let cur_watch_id = self.failure_detector.read().await.watch_id.clone();
968 if cur_watch_id != leader_id {
969 *self.failure_detector.write().await = FailureDetector::watch(leader_id);
970 }
971 }
972 async fn receive_heartbeat(
973 self: &Arc<Self>,
974 leader_id: Id,
975 leader_term: Term,
976 leader_commit: Index,
977 ) -> Result<()> {
978 let ballot_guard = self.vote_token.acquire().await;
979 let mut ballot = self.load_ballot().await?;
980 if leader_term < ballot.cur_term {
981 log::warn!("heartbeat is stale. rejected");
982 return Ok(());
983 }
984
985 self.record_heartbeat().await;
986 self.reset_failure_detector(leader_id.clone()).await;
987
988 if leader_term > ballot.cur_term {
989 log::warn!("received heartbeat with newer term. reset ballot");
990 ballot.cur_term = leader_term;
991 ballot.voted_for = None;
992 *self.election_state.write().await = ElectionState::Follower;
993 }
994
995 if ballot.voted_for != Some(leader_id.clone()) {
996 log::info!("learn the current leader ({})", leader_id.uri());
997 ballot.voted_for = Some(leader_id);
998 }
999
1000 self.save_ballot(ballot).await?;
1001 drop(ballot_guard);
1002
1003 let new_commit_index = std::cmp::min(leader_commit, self.log.get_last_log_index().await?);
1004 self.log
1005 .advance_commit_index(new_commit_index, Arc::clone(&self))
1006 .await?;
1007
1008 Ok(())
1009 }
1010 async fn transfer_leadership(&self) {
1011 let mut xs = vec![];
1012 let peers = self.cluster.read().await.peers.clone();
1013 for (id, peer) in peers {
1014 let progress = peer.progress;
1015 xs.push((progress.match_index, id));
1016 }
1017 xs.sort_by_key(|x| x.0);
1018
1019 if let Some((_, id)) = xs.pop() {
1021 self.send_timeout_now(id);
1022 }
1023 }
1024 fn send_timeout_now(&self, id: Id) {
1025 tokio::spawn(async move {
1026 let endpoint = Endpoint::from(id.uri().clone()).timeout(Duration::from_secs(5));
1027 if let Ok(mut conn) = RaftClient::connect(endpoint).await {
1028 let req = proto_compiled::TimeoutNowReq {};
1029 let _ = conn.timeout_now(req).await;
1030 }
1031 });
1032 }
1033}
1034struct Log {
1035 storage: Box<dyn RaftStorage>,
1036 ack_chans: RwLock<BTreeMap<Index, Ack>>,
1037
1038 snapshot_index: AtomicU64, last_applied: AtomicU64, commit_index: AtomicU64, append_token: Semaphore,
1043 commit_token: Semaphore,
1044 compaction_token: Semaphore,
1045
1046 append_notify: Notify,
1047 replication_notify: Notify,
1048 commit_notify: Notify,
1049 apply_notify: Notify,
1050
1051 applied_membership: Mutex<HashSet<Id>>,
1052 snapshot_queue: snapshot::SnapshotQueue,
1053
1054 apply_error_seq: AtomicU64,
1055}
1056impl Log {
1057 async fn new(storage: impl RaftStorage) -> Self {
1058 let snapshot_index = match storage::find_last_snapshot_index(&storage)
1059 .await
1060 .expect("failed to find initial snapshot index")
1061 {
1062 Some(x) => x,
1063 None => 0,
1064 };
1065 let start_index = if snapshot_index == 0 {
1068 0
1069 } else {
1070 snapshot_index - 1
1071 };
1072 Self {
1073 storage: Box::new(storage),
1074 ack_chans: RwLock::new(BTreeMap::new()),
1075
1076 snapshot_index: snapshot_index.into(),
1077 last_applied: start_index.into(),
1078 commit_index: start_index.into(),
1079
1080 append_token: Semaphore::new(1),
1081 commit_token: Semaphore::new(1),
1082 compaction_token: Semaphore::new(1),
1083
1084 append_notify: Notify::new(),
1085 replication_notify: Notify::new(),
1086 commit_notify: Notify::new(),
1087 apply_notify: Notify::new(),
1088
1089 applied_membership: Mutex::new(HashSet::new()),
1090 snapshot_queue: snapshot::SnapshotQueue::new(),
1091
1092 apply_error_seq: 0.into(),
1093 }
1094 }
1095 async fn get_last_log_index(&self) -> Result<Index> {
1096 self.storage.get_last_index().await
1097 }
1098 fn get_snapshot_index(&self) -> Index {
1099 self.snapshot_index.load(Ordering::SeqCst)
1100 }
1101 async fn append_new_entry(
1102 &self,
1103 command: Bytes,
1104 ack: Option<Ack>,
1105 term: Term,
1106 ) -> Result<Index> {
1107 let _token = self.append_token.acquire().await;
1108
1109 let cur_last_log_index = self.storage.get_last_index().await?;
1110 let prev_clock = self
1111 .storage
1112 .get_entry(cur_last_log_index)
1113 .await?
1114 .unwrap()
1115 .this_clock;
1116 let new_index = cur_last_log_index + 1;
1117 let this_clock = Clock {
1118 term,
1119 index: new_index,
1120 };
1121 let e = Entry {
1122 prev_clock,
1123 this_clock,
1124 command,
1125 };
1126 self.insert_entry(e).await?;
1127 if let Some(x) = ack {
1128 self.ack_chans.write().await.insert(new_index, x);
1129 }
1130 self.append_notify.notify_waiters();
1131 Ok(new_index)
1132 }
1133 async fn try_insert_entry(
1134 &self,
1135 entry: Entry,
1136 sender_id: Id,
1137 core: Arc<RaftCore>,
1138 ) -> Result<TryInsertResult> {
1139 let _token = self.append_token.acquire().await;
1140
1141 let Clock {
1142 term: _,
1143 index: prev_index,
1144 } = entry.prev_clock;
1145 if let Some(prev_clock) = self
1146 .storage
1147 .get_entry(prev_index)
1148 .await?
1149 .map(|x| x.this_clock)
1150 {
1151 if prev_clock != entry.prev_clock {
1152 return Ok(TryInsertResult::Rejected);
1153 }
1154 } else {
1155 let command = entry.command.clone();
1158 if std::matches!(Command::deserialize(&command), Command::Snapshot { .. }) {
1159 let Clock {
1160 term: _,
1161 index: snapshot_index,
1162 } = entry.this_clock;
1163 log::warn!(
1164 "log is too old. replicated a snapshot (idx={}) from leader",
1165 snapshot_index
1166 );
1167
1168 if sender_id != core.id && snapshot_index > 1 {
1170 if let Err(e) = core.fetch_snapshot(snapshot_index, sender_id.clone()).await {
1171 log::error!(
1172 "could not fetch app snapshot (idx={}) from sender {}",
1173 snapshot_index,
1174 sender_id.uri(),
1175 );
1176 return Err(e);
1177 }
1178 }
1179 let inserted = self
1180 .snapshot_queue
1181 .insert(entry, Duration::from_millis(0))
1182 .await;
1183 if !inserted.await {
1184 anyhow::bail!("failed to insert snapshot entry (idx={})", snapshot_index);
1185 }
1186
1187 self.commit_index
1188 .store(snapshot_index - 1, Ordering::SeqCst);
1189 self.last_applied
1190 .store(snapshot_index - 1, Ordering::SeqCst);
1191
1192 return Ok(TryInsertResult::Inserted);
1193 } else {
1194 return Ok(TryInsertResult::Rejected);
1195 }
1196 }
1197
1198 let Clock {
1199 term: _,
1200 index: new_index,
1201 } = entry.this_clock;
1202
1203 if let Some(old_clock) = self
1204 .storage
1205 .get_entry(new_index)
1206 .await?
1207 .map(|e| e.this_clock)
1208 {
1209 if old_clock == entry.this_clock {
1210 Ok(TryInsertResult::Skipped)
1213 } else {
1214 log::warn!("log conflicted at idx: {}", new_index);
1215
1216 let old_last_log_index = self.storage.get_last_index().await?;
1217 for idx in new_index..old_last_log_index {
1218 self.ack_chans.write().await.remove(&idx);
1219 }
1220
1221 self.insert_entry(entry).await?;
1222 Ok(TryInsertResult::Inserted)
1223 }
1224 } else {
1225 self.insert_entry(entry).await?;
1226 Ok(TryInsertResult::Inserted)
1227 }
1228 }
1229 async fn insert_entry(&self, e: Entry) -> Result<()> {
1230 self.storage.insert_entry(e.this_clock.index, e).await?;
1231 Ok(())
1232 }
1233 async fn insert_snapshot(&self, e: Entry) -> Result<()> {
1234 let new_snapshot_index = e.this_clock.index;
1235 self.storage.insert_entry(e.this_clock.index, e).await?;
1236 self.snapshot_index
1237 .fetch_max(new_snapshot_index, Ordering::SeqCst);
1238 Ok(())
1239 }
1240 async fn advance_commit_index(&self, new_agreement: Index, core: Arc<RaftCore>) -> Result<()> {
1241 let _token = self.commit_token.acquire().await;
1242
1243 let old_agreement = self.commit_index.load(Ordering::SeqCst);
1244 if !(new_agreement > old_agreement) {
1245 return Ok(());
1246 }
1247
1248 for i in old_agreement + 1..=new_agreement {
1249 let e = self.storage.get_entry(i).await?.unwrap();
1250 let term = e.this_clock.term;
1251 match Command::deserialize(&e.command) {
1252 Command::ClusterConfiguration { membership } => {
1253 let remove_this_node = !membership.contains(&core.id);
1257 let is_last_membership_change =
1258 i == core.membership_barrier.load(Ordering::SeqCst);
1259 let is_leader =
1260 std::matches!(*core.election_state.read().await, ElectionState::Leader);
1261 if remove_this_node && is_last_membership_change && is_leader {
1262 *core.election_state.write().await = ElectionState::Follower;
1263
1264 core.transfer_leadership().await;
1267 }
1268 }
1269 Command::Noop => {
1270 core.commit_safe_term(term);
1271 }
1272 _ => {}
1273 }
1274
1275 let mut ack_chans = self.ack_chans.write().await;
1276 if !ack_chans.contains_key(&i) {
1277 continue;
1278 }
1279
1280 let ack = ack_chans.get(&i).unwrap();
1281 if std::matches!(ack, Ack::OnCommit(_)) {
1282 if let Ack::OnCommit(tx) = ack_chans.remove(&i).unwrap() {
1283 let _ = tx.send(ack::CommitOk);
1284 }
1285 }
1286 }
1287
1288 log::debug!("commit_index {} -> {}", old_agreement, new_agreement);
1289 self.commit_index.store(new_agreement, Ordering::SeqCst);
1290 self.commit_notify.notify_one();
1291 Ok(())
1292 }
1293 async fn advance_last_applied(&self, raft_core: Arc<RaftCore>) -> Result<()> {
1294 let (apply_index, apply_entry, command) = {
1295 let apply_index = self.last_applied.load(Ordering::SeqCst) + 1;
1296 let mut e = self.storage.get_entry(apply_index).await?.unwrap();
1297 let command = std::mem::take(&mut e.command);
1298 (apply_index, e, command)
1299 };
1300 let ok = match Command::deserialize(&command) {
1301 Command::Snapshot { membership } => {
1302 log::info!("install app snapshot");
1303 let snapshot = if apply_index == 1 {
1304 None
1305 } else {
1306 Some(apply_index)
1307 };
1308 let res = raft_core.app.install_snapshot(snapshot).await;
1309 log::info!("install app snapshot (complete)");
1310 let success = res.is_ok();
1311 if success {
1312 *self.applied_membership.lock().await = membership;
1313 true
1314 } else {
1315 false
1316 }
1317 }
1318 Command::Req { core, ref message } => {
1319 assert_eq!(core, false);
1320 let res = raft_core.app.process_write(message, apply_index).await;
1321 match res {
1322 Ok((msg, make_snapshot)) => {
1323 let mut ack_chans = self.ack_chans.write().await;
1324 if ack_chans.contains_key(&apply_index) {
1325 let ack = ack_chans.get(&apply_index).unwrap();
1326 if std::matches!(ack, Ack::OnApply(_)) {
1327 if let Ack::OnApply(tx) = ack_chans.remove(&apply_index).unwrap() {
1328 let _ = tx.send(ack::ApplyOk(msg));
1329 }
1330 }
1331 }
1332
1333 match make_snapshot {
1334 MakeSnapshot::CopySnapshot => {
1335 let snapshot_entry = Entry {
1337 command: Command::serialize(&Command::Snapshot {
1338 membership: self.applied_membership.lock().await.clone(),
1339 }),
1340 ..apply_entry
1341 };
1342 let delay =
1343 raft_core.config.read().await.snapshot_insertion_delay();
1344 log::info!(
1345 "copy snapshot is made and will be inserted in {:?}",
1346 delay
1347 );
1348
1349 let _ = self.snapshot_queue.insert(snapshot_entry, delay).await;
1350 }
1351 MakeSnapshot::FoldSnapshot => {
1352 tokio::spawn(async move {
1353 let _ = raft_core
1354 .log
1355 .create_fold_snapshot(apply_index, Arc::clone(&raft_core))
1356 .await;
1357 });
1358 }
1359 MakeSnapshot::None => {}
1360 }
1361 true
1362 }
1363 Err(e) => {
1364 log::error!("log apply error: {} (core={})", e, core);
1365 false
1366 }
1367 }
1368 }
1369 Command::ClusterConfiguration { membership } => {
1370 *self.applied_membership.lock().await = membership;
1371 true
1372 }
1373 _ => true,
1374 };
1375 if ok {
1376 self.apply_error_seq.store(0, Ordering::SeqCst);
1377
1378 log::debug!("last_applied -> {}", apply_index);
1379 self.last_applied.store(apply_index, Ordering::SeqCst);
1380 self.apply_notify.notify_one();
1381 } else {
1382 let n_old = self.apply_error_seq.load(Ordering::SeqCst);
1388 let wait_ms: u64 = 100 * (1 << n_old);
1389 log::error!(
1390 "log apply failed at index={} (n={}). wait for {}ms",
1391 apply_index,
1392 n_old + 1,
1393 wait_ms
1394 );
1395 tokio::time::sleep(Duration::from_millis(wait_ms)).await;
1396 self.apply_error_seq.fetch_add(1, Ordering::SeqCst);
1397 }
1398 Ok(())
1399 }
1400 async fn create_fold_snapshot(
1401 &self,
1402 new_snapshot_index: Index,
1403 core: Arc<RaftCore>,
1404 ) -> Result<()> {
1405 assert!(new_snapshot_index <= self.last_applied.load(Ordering::SeqCst));
1406
1407 let _token = self.compaction_token.acquire().await;
1408
1409 let cur_snapshot_index = self.get_snapshot_index();
1410
1411 if new_snapshot_index <= cur_snapshot_index {
1412 return Ok(());
1413 }
1414
1415 log::info!("create new fold snapshot at index {}", new_snapshot_index);
1416 let cur_snapshot_entry = self.storage.get_entry(cur_snapshot_index).await?.unwrap();
1417 if let Command::Snapshot { membership } = Command::deserialize(&cur_snapshot_entry.command)
1418 {
1419 let mut base_snapshot_index = cur_snapshot_index;
1420 let mut new_membership = membership;
1421 let mut commands = BTreeMap::new();
1422 for i in cur_snapshot_index + 1..=new_snapshot_index {
1423 let command = self.storage.get_entry(i).await?.unwrap().command;
1424 commands.insert(i, command);
1425 }
1426 let mut app_messages = vec![];
1427 for (i, command) in &commands {
1428 match Command::deserialize(&command) {
1429 Command::ClusterConfiguration { membership } => {
1430 new_membership = membership;
1431 }
1432 Command::Req {
1433 core: false,
1434 message,
1435 } => {
1436 app_messages.push(message);
1437 }
1438 Command::Snapshot { membership } => {
1439 base_snapshot_index = *i;
1440 new_membership = membership;
1441 app_messages = vec![];
1442 }
1443 _ => {}
1444 }
1445 }
1446 let base_snapshot = if base_snapshot_index == 1 {
1447 None
1448 } else {
1449 Some(base_snapshot_index)
1450 };
1451 core.app
1452 .fold_snapshot(base_snapshot, app_messages, new_snapshot_index)
1453 .await?;
1454 let new_snapshot = {
1455 let mut e = self.storage.get_entry(new_snapshot_index).await?.unwrap();
1456 e.command = Command::serialize(&Command::Snapshot {
1457 membership: new_membership,
1458 });
1459 e
1460 };
1461 let delay = core.config.read().await.snapshot_insertion_delay();
1462 let _ = self.snapshot_queue.insert(new_snapshot, delay).await;
1463 Ok(())
1464 } else {
1465 unreachable!()
1466 }
1467 }
1468 async fn run_gc(&self, core: Arc<RaftCore>) -> Result<()> {
1469 let l = self.storage.get_head_index().await?;
1470 let r = self.get_snapshot_index();
1471 log::debug!("gc {}..{}", l, r);
1472
1473 for i in l..r {
1474 self.ack_chans.write().await.remove(&i);
1477
1478 let entry = self.storage.get_entry(i).await?;
1479 if let Some(entry) = entry {
1480 match Command::deserialize(&entry.command) {
1481 Command::Snapshot { .. } => {
1482 core.app.delete_snapshot(i).await?;
1485 }
1486 _ => {}
1487 }
1488 self.storage.delete_entry(i).await?;
1491 }
1492 }
1493
1494 Ok(())
1495 }
1496}
1497
1498pub type RaftService = proto_compiled::raft_server::RaftServer<server::Server>;
1500
1501pub async fn make_raft_service(
1503 app: impl RaftApp,
1504 storage: impl RaftStorage,
1505 id: Uri,
1506 config: Config,
1507) -> RaftService {
1508 let core = RaftCore::new(app, storage, id, config).await;
1509
1510 tokio::spawn(thread::commit::run(Arc::clone(&core)));
1511 tokio::spawn(thread::compaction::run(Arc::clone(&core)));
1512 tokio::spawn(thread::election::run(Arc::clone(&core)));
1513 tokio::spawn(thread::execution::run(Arc::clone(&core)));
1514 tokio::spawn(thread::query_executor::run(Arc::clone(&core)));
1515 tokio::spawn(thread::gc::run(Arc::clone(&core)));
1516 tokio::spawn(thread::snapshot_installer::run(Arc::clone(&core)));
1517
1518 let server = server::Server { core };
1519 proto_compiled::raft_server::RaftServer::new(server)
1520}