lol_core/storage/
mod.rs

1use crate::{Clock, Command, Id, Index, Term};
2use anyhow::Result;
3use bytes::Bytes;
4
5/// In-memory implementation backed by BTreeMap.
6pub mod memory;
7
8/// Persistent implementation backed by normal files.
9pub mod file;
10
11/// Persistent implementation backed by RocksDB.
12#[cfg(feature = "rocksdb-backend")]
13#[cfg_attr(docsrs, doc(cfg(feature = "rocksdb-backend")))]
14pub mod rocksdb;
15
16mod persistency;
17
18/// Serialized Ballot.
19/// Ballot is a record of the previous vote.
20#[derive(Clone, Debug, PartialEq)]
21pub struct Ballot {
22    pub(crate) cur_term: Term,
23    pub(crate) voted_for: Option<Id>,
24}
25impl Ballot {
26    /// Create a new instance of a [`Ballot`].
27    pub fn new() -> Self {
28        Self {
29            cur_term: 0,
30            voted_for: None,
31        }
32    }
33}
34
35/// Serialized log entry.
36#[derive(Clone)]
37pub struct Entry {
38    pub(crate) prev_clock: Clock,
39    pub(crate) this_clock: Clock,
40    pub(crate) command: Bytes,
41}
42
43/// The abstraction of the log storage.
44/// Conceptually it is considered as a sequence of log entries and the recent vote.
45#[async_trait::async_trait]
46pub trait RaftStorage: Sync + Send + 'static {
47    async fn insert_entry(&self, i: Index, e: Entry) -> Result<()>;
48    async fn delete_entry(&self, i: Index) -> Result<()>;
49    async fn get_entry(&self, i: Index) -> Result<Option<Entry>>;
50    async fn get_head_index(&self) -> Result<Index>;
51    async fn get_last_index(&self) -> Result<Index>;
52    async fn save_ballot(&self, v: Ballot) -> Result<()>;
53    async fn load_ballot(&self) -> Result<Ballot>;
54}
55
56pub(crate) async fn find_last_snapshot_index<S: RaftStorage>(storage: &S) -> Result<Option<Index>> {
57    let last = storage.get_last_index().await?;
58    for i in (1..=last).rev() {
59        let e = storage.get_entry(i).await?.unwrap();
60        match Command::deserialize(&e.command) {
61            Command::Snapshot { .. } => return Ok(Some(i)),
62            _ => {}
63        }
64    }
65    Ok(None)
66}
67
68#[cfg(test)]
69async fn test_storage<S: RaftStorage>(s: S) -> Result<()> {
70    use crate::Uri;
71    use std::collections::HashSet;
72
73    let e = Entry {
74        prev_clock: Clock { term: 0, index: 0 },
75        this_clock: Clock { term: 0, index: 0 },
76        command: Command::serialize(&Command::Noop),
77    };
78    let sn = Entry {
79        prev_clock: Clock { term: 0, index: 0 },
80        this_clock: Clock { term: 0, index: 0 },
81        command: Command::serialize(&Command::Snapshot {
82            membership: HashSet::new(),
83        }),
84    };
85
86    // Vote
87    let uri: Uri = "https://192.168.100.98:50001".parse().unwrap();
88    let id: Id = uri.into();
89    assert_eq!(
90        s.load_ballot().await?,
91        Ballot {
92            cur_term: 0,
93            voted_for: None
94        }
95    );
96    s.save_ballot(Ballot {
97        cur_term: 1,
98        voted_for: Some(id.clone()),
99    })
100    .await?;
101    assert_eq!(
102        s.load_ballot().await?,
103        Ballot {
104            cur_term: 1,
105            voted_for: Some(id.clone())
106        }
107    );
108
109    assert_eq!(find_last_snapshot_index(&s).await?, None);
110    assert_eq!(s.get_last_index().await?, 0);
111    assert!(s.get_entry(1).await?.is_none());
112
113    let sn1 = sn.clone();
114    let e2 = e.clone();
115    let e3 = e.clone();
116    let e4 = e.clone();
117    let e5 = e.clone();
118    s.insert_entry(1, sn1).await?;
119    assert_eq!(s.get_head_index().await?, 1);
120    assert_eq!(s.get_last_index().await?, 1);
121    assert_eq!(find_last_snapshot_index(&s).await?, Some(1));
122    s.insert_entry(2, e2).await?;
123    s.insert_entry(3, e3).await?;
124    s.insert_entry(4, e4).await?;
125    s.insert_entry(5, e5).await?;
126    assert_eq!(s.get_head_index().await?, 1);
127    assert_eq!(s.get_last_index().await?, 5);
128
129    let sn4 = sn.clone();
130    s.insert_entry(4, sn4).await?;
131    assert_eq!(s.get_head_index().await?, 1);
132    assert_eq!(find_last_snapshot_index(&s).await?, Some(4));
133    let sn2 = sn.clone();
134    s.insert_entry(2, sn2).await?;
135    assert_eq!(find_last_snapshot_index(&s).await?, Some(4));
136
137    assert!(s.get_entry(1).await?.is_some());
138    // delete entries before the last snapshot
139    for i in 1..4 {
140        s.delete_entry(i).await?;
141    }
142    assert_eq!(s.get_head_index().await?, 4);
143    assert!(s.get_entry(1).await?.is_none());
144
145    for i in 6..=1000 {
146        let ei = e.clone();
147        s.insert_entry(i, ei).await?;
148    }
149    assert_eq!(s.get_head_index().await?, 4);
150    assert_eq!(s.get_last_index().await?, 1000);
151
152    Ok(())
153}