1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
use crate::{Clock, Term, Index, Id};
use std::collections::BTreeSet;
use bytes::Bytes;
pub mod memory;
#[cfg(feature = "persistency")]
#[cfg_attr(docsrs, doc(cfg(feature = "persistency")))]
pub mod disk;
#[derive(Clone, Debug, PartialEq)]
pub struct Ballot {
pub(crate) cur_term: Term,
pub(crate) voted_for: Option<Id>,
}
impl Ballot {
fn new() -> Self {
Self {
cur_term: 0,
voted_for: None,
}
}
}
#[derive(Clone)]
pub struct Entry {
pub(crate) prev_clock: Clock,
pub(crate) this_clock: Clock,
pub(crate) command: Bytes,
}
#[async_trait::async_trait]
pub trait RaftStorage: Sync + Send + 'static {
async fn delete_before(&self, r: Index) -> anyhow::Result<()> ;
async fn insert_snapshot(&self, i: Index, e: Entry) -> anyhow::Result<()>;
async fn insert_entry(&self, i: Index, e: Entry) -> anyhow::Result<()> ;
async fn get_entry(&self, i: Index) -> anyhow::Result<Option<Entry>>;
async fn get_snapshot_index(&self) -> anyhow::Result<Index>;
async fn get_last_index(&self) -> anyhow::Result<Index>;
async fn save_ballot(&self, v: Ballot) -> anyhow::Result<()>;
async fn load_ballot(&self) -> anyhow::Result<Ballot>;
async fn put_tag(&self, i: Index, snapshot: crate::SnapshotTag) -> anyhow::Result<()>;
async fn delete_tag(&self, i: Index) -> anyhow::Result<()>;
async fn get_tag(&self, i: Index) -> anyhow::Result<Option<crate::SnapshotTag>>;
async fn list_tags(&self) -> anyhow::Result<BTreeSet<Index>>;
}
#[cfg(test)]
async fn test_storage<S: RaftStorage>(s: S) -> anyhow::Result<()> {
let e = Entry {
prev_clock: Clock { term: 0, index: 0 },
this_clock: Clock { term: 0, index: 0 },
command: Bytes::new(),
};
let id = "hoge".to_owned();
assert_eq!(s.load_ballot().await?, Ballot { cur_term: 0, voted_for: None });
s.save_ballot(Ballot { cur_term: 1, voted_for: Some(id.clone()) }).await?;
assert_eq!(s.load_ballot().await?, Ballot { cur_term: 1, voted_for: Some(id.clone()) });
let tag: crate::SnapshotTag = vec![].into();
assert!(s.get_tag(10).await?.is_none());
s.put_tag(10, tag.clone()).await?;
assert_eq!(s.get_tag(10).await?, Some(tag.clone()));
assert_eq!(s.get_snapshot_index().await?, 0);
assert_eq!(s.get_last_index().await?, 0);
assert!(s.get_entry(1).await?.is_none());
let sn1 = e.clone();
let e2 = e.clone();
let e3 = e.clone();
let e4 = e.clone();
let e5 = e.clone();
s.insert_snapshot(1, sn1).await?;
assert_eq!(s.get_last_index().await?, 1);
assert_eq!(s.get_snapshot_index().await?, 1);
s.insert_entry(2, e2).await?;
s.insert_entry(3, e3).await?;
s.insert_entry(4, e4).await?;
s.insert_entry(5, e5).await?;
assert_eq!(s.get_last_index().await?, 5);
let sn4 = e.clone();
s.insert_snapshot(4, sn4).await?;
assert_eq!(s.get_snapshot_index().await?, 4);
let sn2 = e.clone();
s.insert_snapshot(2, sn2).await?;
assert_eq!(s.get_snapshot_index().await?, 4);
assert!(s.get_entry(1).await?.is_some());
s.delete_before(4).await?;
assert!(s.get_entry(1).await?.is_none());
Ok(())
}