lol_core/
lib.rs

1#![deny(unused_must_use)]
2#![cfg_attr(docsrs, feature(doc_cfg))]
3
4//! Raft is a distributed consensus algorithm widely used recently
5//! to build distributed applications like etcd.
6//! However, while it is even understandable than notorious Paxos algorithm
7//! it is still difficult to implement correct and efficient implementation.
8//!
9//! This library is a Raft implementation based on Tonic, a gRPC library based
10//! on Tokio.
11//! By exploiting gRPC features like streaming, the inter-node log replication
12//! and snapshot copying is very efficient.
13
14use anyhow::anyhow;
15use anyhow::Result;
16use async_trait::async_trait;
17use bytes::Bytes;
18use derive_builder::Builder;
19use derive_more::{Display, FromStr};
20use futures::stream::StreamExt;
21use std::collections::{BTreeMap, HashSet};
22use std::sync::atomic::{AtomicU64, Ordering};
23use std::sync::Arc;
24use std::time::{Duration, Instant};
25use tokio::sync::{Mutex, Notify, RwLock, Semaphore};
26
27mod ack;
28mod core_message;
29#[cfg(feature = "gateway")]
30#[cfg_attr(docsrs, doc(cfg(feature = "gateway")))]
31/// Gateway to interact with the cluster.
32pub mod gateway;
33mod membership;
34mod proto;
35mod query_queue;
36mod quorum_join;
37mod server;
38#[cfg(feature = "simple")]
39#[cfg_attr(docsrs, doc(cfg(feature = "simple")))]
40/// Simplified `RaftApp`.
41pub mod simple;
42mod snapshot;
43/// The abstraction of the log storage and some implementations.
44pub mod storage;
45mod thread;
46mod thread_drop;
47
48use ack::Ack;
49use storage::RaftStorage;
50use tonic::transport::Endpoint;
51
52mod proto_compiled {
53    pub use crate::proto::lol_core::*;
54}
55
56/// Available message types for interaction with the cluster.
57pub mod api {
58    pub use crate::proto_compiled::{
59        AddServerRep, AddServerReq, ApplyRep, ApplyReq, ClusterInfoRep, ClusterInfoReq, CommitRep,
60        CommitReq, GetConfigRep, GetConfigReq, RemoveServerRep, RemoveServerReq, StatusRep,
61        StatusReq, TimeoutNowRep, TimeoutNowReq, TuneConfigRep, TuneConfigReq,
62    };
63}
64
65pub use crate::proto_compiled::raft_client::RaftClient;
66
67use storage::{Ballot, Entry};
68
69/// Decision to make a new snapshot.
70pub enum MakeSnapshot {
71    /// Nothing will happen about snapshot.
72    None,
73    /// Copy snapshot has been made in `RaftApp`.
74    /// Returning this requests `RaftCore` to commit a snapshot entry.
75    CopySnapshot,
76    /// Returning this requests `RaftCore` to make a Fold snapshot
77    /// and then commit a snapshot entry.
78    FoldSnapshot,
79}
80
81/// The abstraction of user-defined application.
82///
83/// Note about the error handling:
84/// In Raft, the great rule is the same log should result in the same state.
85/// This means the application of a log entry should result in the same state
86/// and it is not allowed the same entry succeeds in some node and fails in another node.
87/// Therefore function that may change the state (e.g. process_write) should not fail
88/// if there is any chance that other node succeeds the same entry.
89#[async_trait]
90pub trait RaftApp: Sync + Send + 'static {
91    /// Process read request.
92    /// This operation should **not** change the state of the application.
93    async fn process_read(&self, request: &[u8]) -> Result<Vec<u8>>;
94
95    /// Process write request.
96    /// This may change the state of the application.
97    ///
98    /// This function may return `MakeSnapshot` to make a new snapshot.
99    /// The snapshot entry corresponding to the copy snapshot is not guaranteed to be made
100    /// due to possible I/O errors, etc.
101    async fn process_write(
102        &self,
103        request: &[u8],
104        entry_index: Index,
105    ) -> Result<(Vec<u8>, MakeSnapshot)>;
106
107    /// Special type of process_write but when the entry is a snapshot entry.
108    /// Snapshot is None when apply_index is 1 which is the most youngest snapshot.
109    async fn install_snapshot(&self, snapshot: Option<Index>) -> Result<()>;
110
111    /// This function is called from the compaction thread.
112    /// It should return new snapshot from accumulative computation with the old_snapshot and the subsequent log entries.
113    async fn fold_snapshot(
114        &self,
115        old_snapshot: Option<Index>,
116        requests: Vec<&[u8]>,
117        snapshot_index: Index,
118    ) -> Result<()>;
119
120    /// Make a snapshot resource from an in-coming `SnapshotStream`.
121    async fn save_snapshot(&self, st: SnapshotStream, snapshot_index: Index) -> Result<()>;
122
123    /// Open a `SnapshotStream` from a snapshot resource.
124    async fn open_snapshot(&self, x: Index) -> Result<SnapshotStream>;
125
126    /// Delete a snapshot resource.
127    async fn delete_snapshot(&self, x: Index) -> Result<()>;
128}
129
130/// The core-level stream type. It is just a stream of byte chunks.
131/// The length of each chunk may be different.
132pub type SnapshotStream =
133    std::pin::Pin<Box<dyn futures::stream::Stream<Item = anyhow::Result<Bytes>> + Send>>;
134
135type Term = u64;
136/// Log entry index.
137pub type Index = u64;
138#[derive(Clone, Copy, Eq)]
139struct Clock {
140    term: Term,
141    index: Index,
142}
143impl PartialEq for Clock {
144    fn eq(&self, that: &Self) -> bool {
145        self.term == that.term && self.index == that.index
146    }
147}
148
149pub use tonic::transport::Uri;
150
151#[derive(
152    serde::Serialize, serde::Deserialize, Clone, PartialEq, Eq, Hash, Debug, Display, FromStr,
153)]
154struct Id(#[serde(with = "http_serde::uri")] Uri);
155impl Id {
156    fn uri(&self) -> &Uri {
157        &self.0
158    }
159}
160impl From<Uri> for Id {
161    fn from(x: Uri) -> Id {
162        Id(x)
163    }
164}
165
166#[cfg(test)]
167mod tests {
168    use super::*;
169
170    #[test]
171    fn test_uri() {
172        let a = "https://192.168.1.1:8000";
173        let aa: Uri = a.parse().unwrap();
174        let b = "https://192.168.1.1:8000/";
175        let bb: Uri = b.parse().unwrap();
176        // the equality is made by the canonical form.
177        assert_eq!(aa, bb);
178        // the string repr is made by the canonical form.
179        assert_eq!(aa.to_string(), b);
180        assert_ne!(a, b);
181    }
182
183    #[test]
184    fn test_id() {
185        let a = "https://192.168.100.200:8080";
186        let b: Id = a.parse().unwrap();
187        let c = a.to_string();
188        assert_eq!(a, c);
189        let d = c.parse().unwrap();
190        assert_eq!(b, d);
191    }
192
193    #[test]
194    fn test_id_serde() {
195        let a: Id = "http://192.168.1.1:8080".parse().unwrap();
196        let b = bincode::serialize(&a).unwrap();
197        let c = bincode::deserialize(&b).unwrap();
198        assert_eq!(a, c);
199    }
200}
201
202#[derive(serde::Serialize, serde::Deserialize)]
203enum Command<'a> {
204    Noop,
205    Snapshot {
206        membership: HashSet<Id>,
207    },
208    ClusterConfiguration {
209        membership: HashSet<Id>,
210    },
211    Req {
212        core: bool,
213        #[serde(with = "serde_bytes")]
214        message: &'a [u8],
215    },
216}
217impl<'a> Command<'a> {
218    fn serialize(x: &Command) -> Bytes {
219        bincode::serialize(x).unwrap().into()
220    }
221    fn deserialize(x: &[u8]) -> Command {
222        bincode::deserialize(x).unwrap()
223    }
224}
225#[derive(Clone, Copy)]
226enum ElectionState {
227    Leader,
228    Candidate,
229    Follower,
230}
231
232/// Configuration.
233#[derive(Builder)]
234pub struct Config {
235    #[builder(default = "300")]
236    /// Compaction will run in this interval.
237    /// You can set this to 0 to disable fold snapshot.
238    /// This parameter can be updated online.
239    /// default: 300
240    compaction_interval_sec: u64,
241}
242impl Config {
243    fn compaction_interval(&self) -> Duration {
244        Duration::from_secs(self.compaction_interval_sec)
245    }
246    fn snapshot_insertion_delay(&self) -> Duration {
247        // If the fold compaction is disabled, snapshot insertion delays in 10 seconds.
248        // I think this is reasonable tolerance delay for most of the applications.
249        let minv = Duration::from_secs(10);
250        let v = Duration::from_secs(self.compaction_interval_sec / 2);
251        std::cmp::max(minv, v)
252    }
253}
254
255struct FailureDetector {
256    watch_id: Id,
257    detector: phi_detector::PingWindow,
258}
259impl FailureDetector {
260    fn watch(id: Id) -> Self {
261        Self {
262            watch_id: id,
263            detector: phi_detector::PingWindow::new(&[Duration::from_secs(1)], Instant::now()),
264        }
265    }
266}
267
268/// RaftCore is the heart of the Raft system.
269/// It does everything Raft should do like election, dynamic membership change,
270/// log replication, sending snapshot in stream and interaction with user-defined RaftApp.
271struct RaftCore {
272    id: Id,
273    app: Box<dyn RaftApp>,
274    query_queue: Mutex<query_queue::QueryQueue>,
275    log: Log,
276    election_state: RwLock<ElectionState>,
277    cluster: RwLock<membership::Cluster>,
278    config: RwLock<Config>,
279    vote_token: Semaphore,
280    // Until noop is committed and safe term is incrememted
281    // no new entry in the current term is appended to the log.
282    safe_term: AtomicU64,
283    // Membership should not be appended until commit_index passes this line.
284    membership_barrier: AtomicU64,
285
286    failure_detector: RwLock<FailureDetector>,
287}
288impl RaftCore {
289    async fn new(
290        app: impl RaftApp,
291        storage: impl RaftStorage,
292        id: Uri,
293        config: Config,
294    ) -> Arc<Self> {
295        let id: Id = id.into();
296        let init_cluster = membership::Cluster::empty(id.clone()).await;
297        let (membership_index, init_membership) =
298            Self::find_last_membership(&storage).await.unwrap();
299        let init_log = Log::new(storage).await;
300        let fd = FailureDetector::watch(id.clone());
301        let r = Arc::new(Self {
302            app: Box::new(app),
303            query_queue: Mutex::new(query_queue::QueryQueue::new()),
304            id,
305            log: init_log,
306            election_state: RwLock::new(ElectionState::Follower),
307            cluster: RwLock::new(init_cluster),
308            config: RwLock::new(config),
309            vote_token: Semaphore::new(1),
310            safe_term: 0.into(),
311            membership_barrier: 0.into(),
312            failure_detector: RwLock::new(fd),
313        });
314        log::info!(
315            "initial membership is {:?} at {}",
316            init_membership,
317            membership_index
318        );
319        r.set_membership(&init_membership, membership_index)
320            .await
321            .unwrap();
322        r
323    }
324    async fn find_last_membership<S: RaftStorage>(storage: &S) -> Result<(Index, HashSet<Id>)> {
325        let last = storage.get_last_index().await?;
326        let mut ret = (0, HashSet::new());
327        for i in (1..=last).rev() {
328            let e = storage.get_entry(i).await?.unwrap();
329            match Command::deserialize(&e.command) {
330                Command::Snapshot { membership } => {
331                    ret = (i, membership);
332                    break;
333                }
334                Command::ClusterConfiguration { membership } => {
335                    ret = (i, membership);
336                    break;
337                }
338                _ => {}
339            }
340        }
341        Ok(ret)
342    }
343    async fn init_cluster(self: &Arc<Self>) -> Result<()> {
344        let snapshot = Entry {
345            prev_clock: Clock { term: 0, index: 0 },
346            this_clock: Clock { term: 0, index: 1 },
347            command: Command::serialize(&Command::Snapshot {
348                membership: HashSet::new(),
349            }),
350        };
351        self.log.insert_snapshot(snapshot).await?;
352
353        let mut membership = HashSet::new();
354        membership.insert(self.id.clone());
355        let add_server = Entry {
356            prev_clock: Clock { term: 0, index: 1 },
357            this_clock: Clock { term: 0, index: 2 },
358            command: Command::serialize(&Command::ClusterConfiguration {
359                membership: membership.clone(),
360            }),
361        };
362        self.log.insert_entry(add_server).await?;
363        self.set_membership(&membership, 2).await?;
364
365        // After this function is called
366        // this server immediately becomes the leader by self-vote and advance commit index.
367        // Consequently, when initial install_snapshot is called this server is already the leader.
368        self.send_timeout_now(self.id.clone());
369
370        Ok(())
371    }
372    fn allow_new_membership_change(&self) -> bool {
373        self.log.commit_index.load(Ordering::SeqCst)
374            >= self.membership_barrier.load(Ordering::SeqCst)
375    }
376    async fn set_membership(
377        self: &Arc<Self>,
378        membership: &HashSet<Id>,
379        index: Index,
380    ) -> Result<()> {
381        log::info!("change membership to {:?}", membership);
382        self.cluster
383            .write()
384            .await
385            .set_membership(&membership, Arc::clone(&self))
386            .await?;
387        self.membership_barrier.store(index, Ordering::SeqCst);
388        Ok(())
389    }
390    async fn register_query(self: &Arc<Self>, core: bool, message: Bytes, ack: Ack) {
391        let query = query_queue::Query { core, message, ack };
392        self.query_queue
393            .lock()
394            .await
395            .register(self.log.commit_index.load(Ordering::SeqCst), query);
396        // In case last_applied == commit_index and there is no subsequent entries after this line,
397        // no notification on last_applied's change will be made and this query will never be processed.
398        // To avoid this, here manually kicks the execution of query_queue.
399        self.query_queue
400            .lock()
401            .await
402            .execute(
403                self.log.last_applied.load(Ordering::SeqCst),
404                Arc::clone(&self),
405            )
406            .await;
407    }
408}
409enum TryInsertResult {
410    Inserted,
411    Skipped,
412    Rejected,
413}
414struct LogStream {
415    sender_id: Id,
416    prev_log_term: Term,
417    prev_log_index: Index,
418    entries: std::pin::Pin<Box<dyn futures::stream::Stream<Item = LogStreamElem> + Send>>,
419}
420struct LogStreamElem {
421    term: Term,
422    index: Index,
423    command: Bytes,
424}
425
426fn into_out_stream(
427    x: LogStream,
428) -> impl futures::stream::Stream<Item = crate::proto_compiled::AppendEntryReq> {
429    use crate::proto_compiled::{append_entry_req::Elem, AppendStreamEntry, AppendStreamHeader};
430    let header_stream = vec![Elem::Header(AppendStreamHeader {
431        sender_id: x.sender_id.to_string(),
432        prev_log_index: x.prev_log_index,
433        prev_log_term: x.prev_log_term,
434    })];
435    let header_stream = futures::stream::iter(header_stream);
436    let chunk_stream = x.entries.map(|e| {
437        Elem::Entry(AppendStreamEntry {
438            term: e.term,
439            index: e.index,
440            command: e.command,
441        })
442    });
443    header_stream
444        .chain(chunk_stream)
445        .map(|e| crate::proto_compiled::AppendEntryReq { elem: Some(e) })
446}
447// Replication
448impl RaftCore {
449    async fn change_membership(self: &Arc<Self>, command: Bytes, index: Index) -> Result<()> {
450        match Command::deserialize(&command) {
451            Command::Snapshot { membership } => {
452                self.set_membership(&membership, index).await?;
453            }
454            Command::ClusterConfiguration { membership } => {
455                self.set_membership(&membership, index).await?;
456            }
457            _ => {}
458        }
459        Ok(())
460    }
461    fn commit_safe_term(&self, term: Term) {
462        log::info!("noop entry for term {} is successfully committed", term);
463        self.safe_term.fetch_max(term, Ordering::SeqCst);
464    }
465    // Leader calls this fucntion to append new entry to its log.
466    async fn queue_entry(self: &Arc<Self>, command: Bytes, ack: Option<Ack>) -> Result<()> {
467        let term = self.load_ballot().await?.cur_term;
468        // safe_term is a term that noop entry is successfully committed.
469        let cur_safe_term = self.safe_term.load(Ordering::SeqCst);
470        if cur_safe_term < term {
471            return Err(anyhow!(
472                "noop entry for term {} isn't committed yet. (> {})",
473                term,
474                cur_safe_term
475            ));
476        }
477        // command.clone() is cheap because the message buffer is Bytes.
478        let append_index = self
479            .log
480            .append_new_entry(command.clone(), ack, term)
481            .await?;
482        self.log.replication_notify.notify_one();
483
484        // Change membership when cluster configuration is appended.
485        self.change_membership(command, append_index).await?;
486        Ok(())
487    }
488    // Follower calls this function when it receives entries from the leader.
489    async fn queue_received_entry(self: &Arc<Self>, mut req: LogStream) -> Result<bool> {
490        let mut prev_clock = Clock {
491            term: req.prev_log_term,
492            index: req.prev_log_index,
493        };
494        while let Some(e) = req.entries.next().await {
495            let entry = Entry {
496                prev_clock,
497                this_clock: Clock {
498                    term: e.term,
499                    index: e.index,
500                },
501                command: e.command,
502            };
503            let insert_index = entry.this_clock.index;
504            let command = entry.command.clone();
505            match self
506                .log
507                .try_insert_entry(entry, req.sender_id.clone(), Arc::clone(&self))
508                .await?
509            {
510                TryInsertResult::Inserted => {
511                    self.change_membership(command, insert_index).await?;
512                }
513                TryInsertResult::Skipped => {}
514                TryInsertResult::Rejected => {
515                    log::warn!("rejected append entry (clock={:?})", (e.term, e.index));
516                    return Ok(false);
517                }
518            }
519            prev_clock = Clock {
520                term: e.term,
521                index: e.index,
522            };
523        }
524        Ok(true)
525    }
526    async fn prepare_replication_stream(self: &Arc<Self>, l: Index, r: Index) -> Result<LogStream> {
527        let head = self.log.storage.get_entry(l).await?.unwrap();
528        let Clock {
529            term: prev_log_term,
530            index: prev_log_index,
531        } = head.prev_clock;
532        let Clock { term, index } = head.this_clock;
533        let e = LogStreamElem {
534            term,
535            index,
536            command: head.command,
537        };
538        let st1 = futures::stream::iter(vec![e]);
539        let core = Arc::clone(&self);
540        let st2 = async_stream::stream! {
541            for idx in l + 1..r {
542                let x = core.log.storage.get_entry(idx).await.unwrap().unwrap();
543                let Clock { term, index } = x.this_clock;
544                let e = LogStreamElem {
545                    term,
546                    index,
547                    command: x.command,
548                };
549                yield e;
550            }
551        };
552        let st = st1.chain(st2);
553        Ok(LogStream {
554            sender_id: self.id.clone(),
555            prev_log_term,
556            prev_log_index,
557            entries: Box::pin(st),
558        })
559    }
560    async fn advance_replication(self: &Arc<Self>, follower_id: Id) -> Result<bool> {
561        let peer = self
562            .cluster
563            .read()
564            .await
565            .peers
566            .get(&follower_id)
567            .unwrap()
568            .clone();
569
570        let old_progress = peer.progress;
571        let cur_last_log_index = self.log.get_last_log_index().await?;
572
573        // More entries to send?
574        let should_send = cur_last_log_index >= old_progress.next_index;
575        if !should_send {
576            return Ok(false);
577        }
578
579        // The entries to send could be deleted due to previous compactions.
580        // In this case, replication will reset from the current snapshot index.
581        let cur_snapshot_index = self.log.get_snapshot_index();
582        if old_progress.next_index < cur_snapshot_index {
583            log::warn!(
584                "entry not found at next_index (idx={}) for {}",
585                old_progress.next_index,
586                follower_id.uri(),
587            );
588            let new_progress = membership::ReplicationProgress::new(cur_snapshot_index);
589            let mut cluster = self.cluster.write().await;
590            cluster.peers.get_mut(&follower_id).unwrap().progress = new_progress;
591            return Ok(true);
592        }
593
594        let n_max_possible = cur_last_log_index - old_progress.next_index + 1;
595        let n = std::cmp::min(old_progress.next_max_cnt, n_max_possible);
596        assert!(n >= 1);
597
598        let in_stream = self
599            .prepare_replication_stream(old_progress.next_index, old_progress.next_index + n)
600            .await?;
601
602        let res: Result<_> = async {
603            let endpoint =
604                Endpoint::from(follower_id.uri().clone()).connect_timeout(Duration::from_secs(5));
605            let mut conn = RaftClient::connect(endpoint).await?;
606            let out_stream = into_out_stream(in_stream);
607            let res = conn.send_append_entry(out_stream).await?;
608            Ok(res)
609        }
610        .await;
611
612        let mut incremented = false;
613        let new_progress = if res.is_ok() {
614            let res = res.unwrap();
615            match res.into_inner() {
616                crate::proto_compiled::AppendEntryRep { success: true, .. } => {
617                    incremented = true;
618                    membership::ReplicationProgress {
619                        match_index: old_progress.next_index + n - 1,
620                        next_index: old_progress.next_index + n,
621                        next_max_cnt: n * 2,
622                    }
623                }
624                crate::proto_compiled::AppendEntryRep {
625                    success: false,
626                    last_log_index,
627                } => membership::ReplicationProgress {
628                    match_index: old_progress.match_index,
629                    next_index: std::cmp::min(old_progress.next_index - 1, last_log_index + 1),
630                    next_max_cnt: 1,
631                },
632            }
633        } else {
634            old_progress
635        };
636
637        {
638            let mut cluster = self.cluster.write().await;
639            cluster.peers.get_mut(&follower_id).unwrap().progress = new_progress;
640        }
641        if incremented {
642            self.log.replication_notify.notify_one();
643        }
644
645        Ok(true)
646    }
647    async fn find_new_agreement(&self) -> Result<Index> {
648        let mut match_indices = vec![];
649
650        // In leader stepdown, leader is out of the membership
651        // but consensus on the membership change should be made to respond to the client.
652        let last_log_index = self.log.get_last_log_index().await?;
653        match_indices.push(last_log_index);
654
655        for (_, peer) in self.cluster.read().await.peers.clone() {
656            match_indices.push(peer.progress.match_index);
657        }
658
659        match_indices.sort();
660        match_indices.reverse();
661        let mid = match_indices.len() / 2;
662        let new_agreement = match_indices[mid];
663        Ok(new_agreement)
664    }
665}
666// Snapshot
667impl RaftCore {
668    async fn fetch_snapshot(&self, snapshot_index: Index, to: Id) -> Result<()> {
669        let endpoint = Endpoint::from(to.uri().clone()).connect_timeout(Duration::from_secs(5));
670
671        let mut conn = RaftClient::connect(endpoint).await?;
672        let req = proto_compiled::GetSnapshotReq {
673            index: snapshot_index,
674        };
675        let res = conn.get_snapshot(req).await?;
676        let out_stream = res.into_inner();
677        let in_stream = Box::pin(snapshot::into_in_stream(out_stream));
678        self.app.save_snapshot(in_stream, snapshot_index).await?;
679        Ok(())
680    }
681    async fn make_snapshot_stream(&self, snapshot_index: Index) -> Result<Option<SnapshotStream>> {
682        let st = self.app.open_snapshot(snapshot_index).await?;
683        Ok(Some(st))
684    }
685}
686// Election
687impl RaftCore {
688    async fn save_ballot(&self, v: Ballot) -> Result<()> {
689        self.log.storage.save_ballot(v).await
690    }
691    async fn load_ballot(&self) -> Result<Ballot> {
692        self.log.storage.load_ballot().await
693    }
694    async fn detect_election_timeout(&self) -> bool {
695        let fd = &self.failure_detector.read().await.detector;
696        let normal_dist = fd.normal_dist();
697        let phi = normal_dist.phi(Instant::now() - fd.last_ping());
698        phi > 3.
699    }
700    async fn receive_vote(
701        &self,
702        candidate_term: Term,
703        candidate_id: Id,
704        candidate_last_log_clock: Clock,
705        force_vote: bool,
706        pre_vote: bool,
707    ) -> Result<bool> {
708        let allow_side_effects = !pre_vote;
709
710        if !force_vote {
711            if !self.detect_election_timeout().await {
712                return Ok(false);
713            }
714        }
715
716        let _vote_guard = self.vote_token.acquire().await;
717
718        let mut ballot = self.load_ballot().await?;
719        if candidate_term < ballot.cur_term {
720            log::warn!("candidate term is older. reject vote");
721            return Ok(false);
722        }
723
724        if candidate_term > ballot.cur_term {
725            log::warn!("received newer term. reset vote");
726            ballot.cur_term = candidate_term;
727            ballot.voted_for = None;
728            if allow_side_effects {
729                *self.election_state.write().await = ElectionState::Follower;
730            }
731        }
732
733        let cur_last_index = self.log.get_last_log_index().await?;
734
735        // Suppose we have 3 in-memory nodes ND0-2 and initially ND0 is the leader,
736        // log is fully replicated between nodes and there is no in-coming entries.
737        // Suddenly, ND0 and 1 is crashed and soon later rebooted.
738        // In this case, ND2 should become leader by getting vote from either ND0 or ND1.
739        // This is why using weakest clock (0,0) here when there is no entry in the log.
740        let this_last_log_clock = self
741            .log
742            .storage
743            .get_entry(cur_last_index)
744            .await?
745            .map(|x| x.this_clock)
746            .unwrap_or(Clock { term: 0, index: 0 });
747
748        let candidate_win = match candidate_last_log_clock.term.cmp(&this_last_log_clock.term) {
749            std::cmp::Ordering::Greater => true,
750            std::cmp::Ordering::Equal => {
751                candidate_last_log_clock.index >= this_last_log_clock.index
752            }
753            std::cmp::Ordering::Less => false,
754        };
755
756        if !candidate_win {
757            log::warn!("candidate clock is older. reject vote");
758            if allow_side_effects {
759                self.save_ballot(ballot).await?;
760            }
761            return Ok(false);
762        }
763
764        let grant = match &ballot.voted_for {
765            None => {
766                ballot.voted_for = Some(candidate_id.clone());
767                true
768            }
769            Some(id) => {
770                if id == &candidate_id {
771                    true
772                } else {
773                    false
774                }
775            }
776        };
777
778        if allow_side_effects {
779            self.save_ballot(ballot).await?;
780        }
781        log::info!(
782            "voted response to {} = grant: {}",
783            candidate_id.uri(),
784            grant
785        );
786        Ok(grant)
787    }
788    async fn request_votes(
789        self: &Arc<Self>,
790        aim_term: Term,
791        force_vote: bool,
792        pre_vote: bool,
793    ) -> Result<bool> {
794        let (others, remaining) = {
795            let membership = self.cluster.read().await.membership.clone();
796            let n = membership.len();
797            let majority = (n / 2) + 1;
798            let include_self = membership.contains(&self.id);
799            let mut others = vec![];
800            for id in membership {
801                if id != self.id {
802                    others.push(id);
803                }
804            }
805            // -1 = self vote
806            let m = if include_self { majority - 1 } else { majority };
807            (others, m)
808        };
809
810        let last_log_index = self.log.get_last_log_index().await?;
811        let last_log_clock = self
812            .log
813            .storage
814            .get_entry(last_log_index)
815            .await?
816            .unwrap()
817            .this_clock;
818
819        // Let's get remaining votes out of others.
820        let mut vote_requests = vec![];
821        for endpoint in others {
822            let myid = self.id.clone();
823            vote_requests.push(async move {
824                let Clock {
825                    term: last_log_term,
826                    index: last_log_index,
827                } = last_log_clock;
828                let req = crate::proto_compiled::RequestVoteReq {
829                    term: aim_term,
830                    candidate_id: myid.to_string(),
831                    last_log_term,
832                    last_log_index,
833                    // $4.2.3
834                    // If force_vote is set, the receiver server accepts the vote request
835                    // regardless of the heartbeat timeout otherwise the vote request is
836                    // dropped when it's receiving heartbeat.
837                    force_vote,
838                    // $9.6 Preventing disruptions when a server rejoins the cluster
839                    // We recommend the Pre-Vote extension in deployments that would benefit from additional robustness.
840                    pre_vote,
841                };
842                let res: Result<_> = async {
843                    let endpoint =
844                        Endpoint::from(endpoint.uri().clone()).timeout(Duration::from_secs(5));
845                    let mut conn = RaftClient::connect(endpoint).await?;
846                    let res = conn.request_vote(req).await?;
847                    Ok(res)
848                }
849                .await;
850                match res {
851                    Ok(res) => res.into_inner().vote_granted,
852                    Err(_) => false,
853                }
854            });
855        }
856        let ok = quorum_join::quorum_join(remaining, vote_requests).await;
857        Ok(ok)
858    }
859    async fn after_votes(self: &Arc<Self>, aim_term: Term, ok: bool) -> Result<()> {
860        if ok {
861            log::info!("got enough votes from the cluster. promoted to leader");
862
863            // As soon as the node becomes the leader, replicate noop entries with term.
864            let index = self
865                .log
866                .append_new_entry(Command::serialize(&Command::Noop), None, aim_term)
867                .await?;
868            self.membership_barrier.store(index, Ordering::SeqCst);
869
870            // Initialize replication progress
871            {
872                let initial_progress =
873                    membership::ReplicationProgress::new(self.log.get_last_log_index().await?);
874                let mut cluster = self.cluster.write().await;
875                for (_, peer) in &mut cluster.peers {
876                    peer.progress = initial_progress.clone();
877                }
878            }
879
880            *self.election_state.write().await = ElectionState::Leader;
881        } else {
882            log::info!("failed to become leader. now back to follower");
883            *self.election_state.write().await = ElectionState::Follower;
884        }
885        Ok(())
886    }
887    async fn try_promote(self: &Arc<Self>, force_vote: bool) -> Result<()> {
888        // Pre-Vote phase.
889        let pre_aim_term = {
890            let _ballot_guard = self.vote_token.acquire().await;
891            let ballot = self.load_ballot().await?;
892            ballot.cur_term + 1
893        };
894
895        log::info!("start pre-vote. try promote at term {}", pre_aim_term);
896        let ok = self
897            .request_votes(pre_aim_term, force_vote, true)
898            .await
899            .unwrap_or(false);
900        // If pre-vote failed, do nothing and return.
901        if !ok {
902            log::info!("pre-vote failed for term {}", pre_aim_term);
903            return Ok(());
904        }
905
906        // Vote to self
907        let aim_term = {
908            let ballot_guard = self.vote_token.acquire().await;
909            let mut new_ballot = self.load_ballot().await?;
910            let aim_term = new_ballot.cur_term + 1;
911
912            // If the aim-term's changed, no election starts similar to compare-and-swap.
913            // This could happen if this node's received TimeoutNow.
914            if aim_term != pre_aim_term {
915                return Ok(());
916            }
917
918            new_ballot.cur_term = aim_term;
919            new_ballot.voted_for = Some(self.id.clone());
920
921            self.save_ballot(new_ballot).await?;
922            drop(ballot_guard);
923
924            // Becoming Candidate avoids this node starts another election during this election.
925            *self.election_state.write().await = ElectionState::Candidate;
926            aim_term
927        };
928
929        log::info!("start election. try promote at term {}", aim_term);
930
931        // Try to promote at the term.
932        // Failing some I/O operations during election will be considered as election failure.
933        let ok = self
934            .request_votes(aim_term, force_vote, false)
935            .await
936            .unwrap_or(false);
937        self.after_votes(aim_term, ok).await?;
938
939        Ok(())
940    }
941    async fn send_heartbeat(&self, follower_id: Id) -> Result<()> {
942        let endpoint = Endpoint::from(follower_id.uri().clone()).timeout(Duration::from_secs(5));
943        let req = {
944            let term = self.load_ballot().await?.cur_term;
945            proto_compiled::HeartbeatReq {
946                term,
947                leader_id: self.id.to_string(),
948                leader_commit: self.log.commit_index.load(Ordering::SeqCst),
949            }
950        };
951        if let Ok(mut conn) = RaftClient::connect(endpoint).await {
952            let res = conn.send_heartbeat(req).await;
953            if res.is_err() {
954                log::warn!("heartbeat to {} failed", follower_id.uri());
955            }
956        }
957        Ok(())
958    }
959    async fn record_heartbeat(&self) {
960        self.failure_detector
961            .write()
962            .await
963            .detector
964            .add_ping(Instant::now())
965    }
966    async fn reset_failure_detector(&self, leader_id: Id) {
967        let cur_watch_id = self.failure_detector.read().await.watch_id.clone();
968        if cur_watch_id != leader_id {
969            *self.failure_detector.write().await = FailureDetector::watch(leader_id);
970        }
971    }
972    async fn receive_heartbeat(
973        self: &Arc<Self>,
974        leader_id: Id,
975        leader_term: Term,
976        leader_commit: Index,
977    ) -> Result<()> {
978        let ballot_guard = self.vote_token.acquire().await;
979        let mut ballot = self.load_ballot().await?;
980        if leader_term < ballot.cur_term {
981            log::warn!("heartbeat is stale. rejected");
982            return Ok(());
983        }
984
985        self.record_heartbeat().await;
986        self.reset_failure_detector(leader_id.clone()).await;
987
988        if leader_term > ballot.cur_term {
989            log::warn!("received heartbeat with newer term. reset ballot");
990            ballot.cur_term = leader_term;
991            ballot.voted_for = None;
992            *self.election_state.write().await = ElectionState::Follower;
993        }
994
995        if ballot.voted_for != Some(leader_id.clone()) {
996            log::info!("learn the current leader ({})", leader_id.uri());
997            ballot.voted_for = Some(leader_id);
998        }
999
1000        self.save_ballot(ballot).await?;
1001        drop(ballot_guard);
1002
1003        let new_commit_index = std::cmp::min(leader_commit, self.log.get_last_log_index().await?);
1004        self.log
1005            .advance_commit_index(new_commit_index, Arc::clone(&self))
1006            .await?;
1007
1008        Ok(())
1009    }
1010    async fn transfer_leadership(&self) {
1011        let mut xs = vec![];
1012        let peers = self.cluster.read().await.peers.clone();
1013        for (id, peer) in peers {
1014            let progress = peer.progress;
1015            xs.push((progress.match_index, id));
1016        }
1017        xs.sort_by_key(|x| x.0);
1018
1019        // Choose the one with the higher match_index as the next leader.
1020        if let Some((_, id)) = xs.pop() {
1021            self.send_timeout_now(id);
1022        }
1023    }
1024    fn send_timeout_now(&self, id: Id) {
1025        tokio::spawn(async move {
1026            let endpoint = Endpoint::from(id.uri().clone()).timeout(Duration::from_secs(5));
1027            if let Ok(mut conn) = RaftClient::connect(endpoint).await {
1028                let req = proto_compiled::TimeoutNowReq {};
1029                let _ = conn.timeout_now(req).await;
1030            }
1031        });
1032    }
1033}
1034struct Log {
1035    storage: Box<dyn RaftStorage>,
1036    ack_chans: RwLock<BTreeMap<Index, Ack>>,
1037
1038    snapshot_index: AtomicU64, // Monotonic
1039    last_applied: AtomicU64,   // Monotonic
1040    commit_index: AtomicU64,   // Monotonic
1041
1042    append_token: Semaphore,
1043    commit_token: Semaphore,
1044    compaction_token: Semaphore,
1045
1046    append_notify: Notify,
1047    replication_notify: Notify,
1048    commit_notify: Notify,
1049    apply_notify: Notify,
1050
1051    applied_membership: Mutex<HashSet<Id>>,
1052    snapshot_queue: snapshot::SnapshotQueue,
1053
1054    apply_error_seq: AtomicU64,
1055}
1056impl Log {
1057    async fn new(storage: impl RaftStorage) -> Self {
1058        let snapshot_index = match storage::find_last_snapshot_index(&storage)
1059            .await
1060            .expect("failed to find initial snapshot index")
1061        {
1062            Some(x) => x,
1063            None => 0,
1064        };
1065        // When the storage is persistent initial commit_index and last_applied
1066        // should be set appropriately just before the snapshot index.
1067        let start_index = if snapshot_index == 0 {
1068            0
1069        } else {
1070            snapshot_index - 1
1071        };
1072        Self {
1073            storage: Box::new(storage),
1074            ack_chans: RwLock::new(BTreeMap::new()),
1075
1076            snapshot_index: snapshot_index.into(),
1077            last_applied: start_index.into(),
1078            commit_index: start_index.into(),
1079
1080            append_token: Semaphore::new(1),
1081            commit_token: Semaphore::new(1),
1082            compaction_token: Semaphore::new(1),
1083
1084            append_notify: Notify::new(),
1085            replication_notify: Notify::new(),
1086            commit_notify: Notify::new(),
1087            apply_notify: Notify::new(),
1088
1089            applied_membership: Mutex::new(HashSet::new()),
1090            snapshot_queue: snapshot::SnapshotQueue::new(),
1091
1092            apply_error_seq: 0.into(),
1093        }
1094    }
1095    async fn get_last_log_index(&self) -> Result<Index> {
1096        self.storage.get_last_index().await
1097    }
1098    fn get_snapshot_index(&self) -> Index {
1099        self.snapshot_index.load(Ordering::SeqCst)
1100    }
1101    async fn append_new_entry(
1102        &self,
1103        command: Bytes,
1104        ack: Option<Ack>,
1105        term: Term,
1106    ) -> Result<Index> {
1107        let _token = self.append_token.acquire().await;
1108
1109        let cur_last_log_index = self.storage.get_last_index().await?;
1110        let prev_clock = self
1111            .storage
1112            .get_entry(cur_last_log_index)
1113            .await?
1114            .unwrap()
1115            .this_clock;
1116        let new_index = cur_last_log_index + 1;
1117        let this_clock = Clock {
1118            term,
1119            index: new_index,
1120        };
1121        let e = Entry {
1122            prev_clock,
1123            this_clock,
1124            command,
1125        };
1126        self.insert_entry(e).await?;
1127        if let Some(x) = ack {
1128            self.ack_chans.write().await.insert(new_index, x);
1129        }
1130        self.append_notify.notify_waiters();
1131        Ok(new_index)
1132    }
1133    async fn try_insert_entry(
1134        &self,
1135        entry: Entry,
1136        sender_id: Id,
1137        core: Arc<RaftCore>,
1138    ) -> Result<TryInsertResult> {
1139        let _token = self.append_token.acquire().await;
1140
1141        let Clock {
1142            term: _,
1143            index: prev_index,
1144        } = entry.prev_clock;
1145        if let Some(prev_clock) = self
1146            .storage
1147            .get_entry(prev_index)
1148            .await?
1149            .map(|x| x.this_clock)
1150        {
1151            if prev_clock != entry.prev_clock {
1152                return Ok(TryInsertResult::Rejected);
1153            }
1154        } else {
1155            // If the entry is snapshot then we should insert this entry without consistency checks.
1156            // Old entries before the new snapshot will be garbage collected.
1157            let command = entry.command.clone();
1158            if std::matches!(Command::deserialize(&command), Command::Snapshot { .. }) {
1159                let Clock {
1160                    term: _,
1161                    index: snapshot_index,
1162                } = entry.this_clock;
1163                log::warn!(
1164                    "log is too old. replicated a snapshot (idx={}) from leader",
1165                    snapshot_index
1166                );
1167
1168                // Snapshot resource is not defined with snapshot_index=1.
1169                if sender_id != core.id && snapshot_index > 1 {
1170                    if let Err(e) = core.fetch_snapshot(snapshot_index, sender_id.clone()).await {
1171                        log::error!(
1172                            "could not fetch app snapshot (idx={}) from sender {}",
1173                            snapshot_index,
1174                            sender_id.uri(),
1175                        );
1176                        return Err(e);
1177                    }
1178                }
1179                let inserted = self
1180                    .snapshot_queue
1181                    .insert(entry, Duration::from_millis(0))
1182                    .await;
1183                if !inserted.await {
1184                    anyhow::bail!("failed to insert snapshot entry (idx={})", snapshot_index);
1185                }
1186
1187                self.commit_index
1188                    .store(snapshot_index - 1, Ordering::SeqCst);
1189                self.last_applied
1190                    .store(snapshot_index - 1, Ordering::SeqCst);
1191
1192                return Ok(TryInsertResult::Inserted);
1193            } else {
1194                return Ok(TryInsertResult::Rejected);
1195            }
1196        }
1197
1198        let Clock {
1199            term: _,
1200            index: new_index,
1201        } = entry.this_clock;
1202
1203        if let Some(old_clock) = self
1204            .storage
1205            .get_entry(new_index)
1206            .await?
1207            .map(|e| e.this_clock)
1208        {
1209            if old_clock == entry.this_clock {
1210                // If there is a entry with the same term and index
1211                // then the entry should be the same so skip insertion.
1212                Ok(TryInsertResult::Skipped)
1213            } else {
1214                log::warn!("log conflicted at idx: {}", new_index);
1215
1216                let old_last_log_index = self.storage.get_last_index().await?;
1217                for idx in new_index..old_last_log_index {
1218                    self.ack_chans.write().await.remove(&idx);
1219                }
1220
1221                self.insert_entry(entry).await?;
1222                Ok(TryInsertResult::Inserted)
1223            }
1224        } else {
1225            self.insert_entry(entry).await?;
1226            Ok(TryInsertResult::Inserted)
1227        }
1228    }
1229    async fn insert_entry(&self, e: Entry) -> Result<()> {
1230        self.storage.insert_entry(e.this_clock.index, e).await?;
1231        Ok(())
1232    }
1233    async fn insert_snapshot(&self, e: Entry) -> Result<()> {
1234        let new_snapshot_index = e.this_clock.index;
1235        self.storage.insert_entry(e.this_clock.index, e).await?;
1236        self.snapshot_index
1237            .fetch_max(new_snapshot_index, Ordering::SeqCst);
1238        Ok(())
1239    }
1240    async fn advance_commit_index(&self, new_agreement: Index, core: Arc<RaftCore>) -> Result<()> {
1241        let _token = self.commit_token.acquire().await;
1242
1243        let old_agreement = self.commit_index.load(Ordering::SeqCst);
1244        if !(new_agreement > old_agreement) {
1245            return Ok(());
1246        }
1247
1248        for i in old_agreement + 1..=new_agreement {
1249            let e = self.storage.get_entry(i).await?.unwrap();
1250            let term = e.this_clock.term;
1251            match Command::deserialize(&e.command) {
1252                Command::ClusterConfiguration { membership } => {
1253                    // Leader stepdown should happen iff the last membership change doesn't contain the leader.
1254                    // This code is safe because doing or not doing leadership transfer will not affect anything
1255                    // (IOW, it is only a hint) but confuse the leadership which only causes instant downtime.
1256                    let remove_this_node = !membership.contains(&core.id);
1257                    let is_last_membership_change =
1258                        i == core.membership_barrier.load(Ordering::SeqCst);
1259                    let is_leader =
1260                        std::matches!(*core.election_state.read().await, ElectionState::Leader);
1261                    if remove_this_node && is_last_membership_change && is_leader {
1262                        *core.election_state.write().await = ElectionState::Follower;
1263
1264                        // If leader node steps down choose one of the follower node to
1265                        // become candidate immediately so the downtime becomes shorter.
1266                        core.transfer_leadership().await;
1267                    }
1268                }
1269                Command::Noop => {
1270                    core.commit_safe_term(term);
1271                }
1272                _ => {}
1273            }
1274
1275            let mut ack_chans = self.ack_chans.write().await;
1276            if !ack_chans.contains_key(&i) {
1277                continue;
1278            }
1279
1280            let ack = ack_chans.get(&i).unwrap();
1281            if std::matches!(ack, Ack::OnCommit(_)) {
1282                if let Ack::OnCommit(tx) = ack_chans.remove(&i).unwrap() {
1283                    let _ = tx.send(ack::CommitOk);
1284                }
1285            }
1286        }
1287
1288        log::debug!("commit_index {} -> {}", old_agreement, new_agreement);
1289        self.commit_index.store(new_agreement, Ordering::SeqCst);
1290        self.commit_notify.notify_one();
1291        Ok(())
1292    }
1293    async fn advance_last_applied(&self, raft_core: Arc<RaftCore>) -> Result<()> {
1294        let (apply_index, apply_entry, command) = {
1295            let apply_index = self.last_applied.load(Ordering::SeqCst) + 1;
1296            let mut e = self.storage.get_entry(apply_index).await?.unwrap();
1297            let command = std::mem::take(&mut e.command);
1298            (apply_index, e, command)
1299        };
1300        let ok = match Command::deserialize(&command) {
1301            Command::Snapshot { membership } => {
1302                log::info!("install app snapshot");
1303                let snapshot = if apply_index == 1 {
1304                    None
1305                } else {
1306                    Some(apply_index)
1307                };
1308                let res = raft_core.app.install_snapshot(snapshot).await;
1309                log::info!("install app snapshot (complete)");
1310                let success = res.is_ok();
1311                if success {
1312                    *self.applied_membership.lock().await = membership;
1313                    true
1314                } else {
1315                    false
1316                }
1317            }
1318            Command::Req { core, ref message } => {
1319                assert_eq!(core, false);
1320                let res = raft_core.app.process_write(message, apply_index).await;
1321                match res {
1322                    Ok((msg, make_snapshot)) => {
1323                        let mut ack_chans = self.ack_chans.write().await;
1324                        if ack_chans.contains_key(&apply_index) {
1325                            let ack = ack_chans.get(&apply_index).unwrap();
1326                            if std::matches!(ack, Ack::OnApply(_)) {
1327                                if let Ack::OnApply(tx) = ack_chans.remove(&apply_index).unwrap() {
1328                                    let _ = tx.send(ack::ApplyOk(msg));
1329                                }
1330                            }
1331                        }
1332
1333                        match make_snapshot {
1334                            MakeSnapshot::CopySnapshot => {
1335                                // Now that RaftApp's already made a snapshot resource then we will commit the snapshot entry.
1336                                let snapshot_entry = Entry {
1337                                    command: Command::serialize(&Command::Snapshot {
1338                                        membership: self.applied_membership.lock().await.clone(),
1339                                    }),
1340                                    ..apply_entry
1341                                };
1342                                let delay =
1343                                    raft_core.config.read().await.snapshot_insertion_delay();
1344                                log::info!(
1345                                    "copy snapshot is made and will be inserted in {:?}",
1346                                    delay
1347                                );
1348
1349                                let _ = self.snapshot_queue.insert(snapshot_entry, delay).await;
1350                            }
1351                            MakeSnapshot::FoldSnapshot => {
1352                                tokio::spawn(async move {
1353                                    let _ = raft_core
1354                                        .log
1355                                        .create_fold_snapshot(apply_index, Arc::clone(&raft_core))
1356                                        .await;
1357                                });
1358                            }
1359                            MakeSnapshot::None => {}
1360                        }
1361                        true
1362                    }
1363                    Err(e) => {
1364                        log::error!("log apply error: {} (core={})", e, core);
1365                        false
1366                    }
1367                }
1368            }
1369            Command::ClusterConfiguration { membership } => {
1370                *self.applied_membership.lock().await = membership;
1371                true
1372            }
1373            _ => true,
1374        };
1375        if ok {
1376            self.apply_error_seq.store(0, Ordering::SeqCst);
1377
1378            log::debug!("last_applied -> {}", apply_index);
1379            self.last_applied.store(apply_index, Ordering::SeqCst);
1380            self.apply_notify.notify_one();
1381        } else {
1382            // We assume process_write typically fails due to
1383            // some recoverable temporary network/storage errors.
1384            // It should return ok to skip the broken entry otherwise.
1385            // Retry will be done with adaptive penalty in-between
1386            // so these error will be more likely to recover.
1387            let n_old = self.apply_error_seq.load(Ordering::SeqCst);
1388            let wait_ms: u64 = 100 * (1 << n_old);
1389            log::error!(
1390                "log apply failed at index={} (n={}). wait for {}ms",
1391                apply_index,
1392                n_old + 1,
1393                wait_ms
1394            );
1395            tokio::time::sleep(Duration::from_millis(wait_ms)).await;
1396            self.apply_error_seq.fetch_add(1, Ordering::SeqCst);
1397        }
1398        Ok(())
1399    }
1400    async fn create_fold_snapshot(
1401        &self,
1402        new_snapshot_index: Index,
1403        core: Arc<RaftCore>,
1404    ) -> Result<()> {
1405        assert!(new_snapshot_index <= self.last_applied.load(Ordering::SeqCst));
1406
1407        let _token = self.compaction_token.acquire().await;
1408
1409        let cur_snapshot_index = self.get_snapshot_index();
1410
1411        if new_snapshot_index <= cur_snapshot_index {
1412            return Ok(());
1413        }
1414
1415        log::info!("create new fold snapshot at index {}", new_snapshot_index);
1416        let cur_snapshot_entry = self.storage.get_entry(cur_snapshot_index).await?.unwrap();
1417        if let Command::Snapshot { membership } = Command::deserialize(&cur_snapshot_entry.command)
1418        {
1419            let mut base_snapshot_index = cur_snapshot_index;
1420            let mut new_membership = membership;
1421            let mut commands = BTreeMap::new();
1422            for i in cur_snapshot_index + 1..=new_snapshot_index {
1423                let command = self.storage.get_entry(i).await?.unwrap().command;
1424                commands.insert(i, command);
1425            }
1426            let mut app_messages = vec![];
1427            for (i, command) in &commands {
1428                match Command::deserialize(&command) {
1429                    Command::ClusterConfiguration { membership } => {
1430                        new_membership = membership;
1431                    }
1432                    Command::Req {
1433                        core: false,
1434                        message,
1435                    } => {
1436                        app_messages.push(message);
1437                    }
1438                    Command::Snapshot { membership } => {
1439                        base_snapshot_index = *i;
1440                        new_membership = membership;
1441                        app_messages = vec![];
1442                    }
1443                    _ => {}
1444                }
1445            }
1446            let base_snapshot = if base_snapshot_index == 1 {
1447                None
1448            } else {
1449                Some(base_snapshot_index)
1450            };
1451            core.app
1452                .fold_snapshot(base_snapshot, app_messages, new_snapshot_index)
1453                .await?;
1454            let new_snapshot = {
1455                let mut e = self.storage.get_entry(new_snapshot_index).await?.unwrap();
1456                e.command = Command::serialize(&Command::Snapshot {
1457                    membership: new_membership,
1458                });
1459                e
1460            };
1461            let delay = core.config.read().await.snapshot_insertion_delay();
1462            let _ = self.snapshot_queue.insert(new_snapshot, delay).await;
1463            Ok(())
1464        } else {
1465            unreachable!()
1466        }
1467    }
1468    async fn run_gc(&self, core: Arc<RaftCore>) -> Result<()> {
1469        let l = self.storage.get_head_index().await?;
1470        let r = self.get_snapshot_index();
1471        log::debug!("gc {}..{}", l, r);
1472
1473        for i in l..r {
1474            // Remove remaining ack?
1475            // Not sure it really exists.
1476            self.ack_chans.write().await.remove(&i);
1477
1478            let entry = self.storage.get_entry(i).await?;
1479            if let Some(entry) = entry {
1480                match Command::deserialize(&entry.command) {
1481                    Command::Snapshot { .. } => {
1482                        // Delete snapshot entry.
1483                        // There should be snapshot resource to this snapshot entry.
1484                        core.app.delete_snapshot(i).await?;
1485                    }
1486                    _ => {}
1487                }
1488                // Delete the entry.
1489                // but after everything is successfully deleted.
1490                self.storage.delete_entry(i).await?;
1491            }
1492        }
1493
1494        Ok(())
1495    }
1496}
1497
1498/// A Raft implementation of `tower::Service`.
1499pub type RaftService = proto_compiled::raft_server::RaftServer<server::Server>;
1500
1501/// Make a `RaftService`.
1502pub async fn make_raft_service(
1503    app: impl RaftApp,
1504    storage: impl RaftStorage,
1505    id: Uri,
1506    config: Config,
1507) -> RaftService {
1508    let core = RaftCore::new(app, storage, id, config).await;
1509
1510    tokio::spawn(thread::commit::run(Arc::clone(&core)));
1511    tokio::spawn(thread::compaction::run(Arc::clone(&core)));
1512    tokio::spawn(thread::election::run(Arc::clone(&core)));
1513    tokio::spawn(thread::execution::run(Arc::clone(&core)));
1514    tokio::spawn(thread::query_executor::run(Arc::clone(&core)));
1515    tokio::spawn(thread::gc::run(Arc::clone(&core)));
1516    tokio::spawn(thread::snapshot_installer::run(Arc::clone(&core)));
1517
1518    let server = server::Server { core };
1519    proto_compiled::raft_server::RaftServer::new(server)
1520}