1use crate::{Clock, Command, Id, Index, Term};
2use anyhow::Result;
3use bytes::Bytes;
4
5pub mod memory;
7
8pub mod file;
10
11#[cfg(feature = "rocksdb-backend")]
13#[cfg_attr(docsrs, doc(cfg(feature = "rocksdb-backend")))]
14pub mod rocksdb;
15
16mod persistency;
17
18#[derive(Clone, Debug, PartialEq)]
21pub struct Ballot {
22 pub(crate) cur_term: Term,
23 pub(crate) voted_for: Option<Id>,
24}
25impl Ballot {
26 pub fn new() -> Self {
28 Self {
29 cur_term: 0,
30 voted_for: None,
31 }
32 }
33}
34
35#[derive(Clone)]
37pub struct Entry {
38 pub(crate) prev_clock: Clock,
39 pub(crate) this_clock: Clock,
40 pub(crate) command: Bytes,
41}
42
43#[async_trait::async_trait]
46pub trait RaftStorage: Sync + Send + 'static {
47 async fn insert_entry(&self, i: Index, e: Entry) -> Result<()>;
48 async fn delete_entry(&self, i: Index) -> Result<()>;
49 async fn get_entry(&self, i: Index) -> Result<Option<Entry>>;
50 async fn get_head_index(&self) -> Result<Index>;
51 async fn get_last_index(&self) -> Result<Index>;
52 async fn save_ballot(&self, v: Ballot) -> Result<()>;
53 async fn load_ballot(&self) -> Result<Ballot>;
54}
55
56pub(crate) async fn find_last_snapshot_index<S: RaftStorage>(storage: &S) -> Result<Option<Index>> {
57 let last = storage.get_last_index().await?;
58 for i in (1..=last).rev() {
59 let e = storage.get_entry(i).await?.unwrap();
60 match Command::deserialize(&e.command) {
61 Command::Snapshot { .. } => return Ok(Some(i)),
62 _ => {}
63 }
64 }
65 Ok(None)
66}
67
68#[cfg(test)]
69async fn test_storage<S: RaftStorage>(s: S) -> Result<()> {
70 use crate::Uri;
71 use std::collections::HashSet;
72
73 let e = Entry {
74 prev_clock: Clock { term: 0, index: 0 },
75 this_clock: Clock { term: 0, index: 0 },
76 command: Command::serialize(&Command::Noop),
77 };
78 let sn = Entry {
79 prev_clock: Clock { term: 0, index: 0 },
80 this_clock: Clock { term: 0, index: 0 },
81 command: Command::serialize(&Command::Snapshot {
82 membership: HashSet::new(),
83 }),
84 };
85
86 let uri: Uri = "https://192.168.100.98:50001".parse().unwrap();
88 let id: Id = uri.into();
89 assert_eq!(
90 s.load_ballot().await?,
91 Ballot {
92 cur_term: 0,
93 voted_for: None
94 }
95 );
96 s.save_ballot(Ballot {
97 cur_term: 1,
98 voted_for: Some(id.clone()),
99 })
100 .await?;
101 assert_eq!(
102 s.load_ballot().await?,
103 Ballot {
104 cur_term: 1,
105 voted_for: Some(id.clone())
106 }
107 );
108
109 assert_eq!(find_last_snapshot_index(&s).await?, None);
110 assert_eq!(s.get_last_index().await?, 0);
111 assert!(s.get_entry(1).await?.is_none());
112
113 let sn1 = sn.clone();
114 let e2 = e.clone();
115 let e3 = e.clone();
116 let e4 = e.clone();
117 let e5 = e.clone();
118 s.insert_entry(1, sn1).await?;
119 assert_eq!(s.get_head_index().await?, 1);
120 assert_eq!(s.get_last_index().await?, 1);
121 assert_eq!(find_last_snapshot_index(&s).await?, Some(1));
122 s.insert_entry(2, e2).await?;
123 s.insert_entry(3, e3).await?;
124 s.insert_entry(4, e4).await?;
125 s.insert_entry(5, e5).await?;
126 assert_eq!(s.get_head_index().await?, 1);
127 assert_eq!(s.get_last_index().await?, 5);
128
129 let sn4 = sn.clone();
130 s.insert_entry(4, sn4).await?;
131 assert_eq!(s.get_head_index().await?, 1);
132 assert_eq!(find_last_snapshot_index(&s).await?, Some(4));
133 let sn2 = sn.clone();
134 s.insert_entry(2, sn2).await?;
135 assert_eq!(find_last_snapshot_index(&s).await?, Some(4));
136
137 assert!(s.get_entry(1).await?.is_some());
138 for i in 1..4 {
140 s.delete_entry(i).await?;
141 }
142 assert_eq!(s.get_head_index().await?, 4);
143 assert!(s.get_entry(1).await?.is_none());
144
145 for i in 6..=1000 {
146 let ei = e.clone();
147 s.insert_entry(i, ei).await?;
148 }
149 assert_eq!(s.get_head_index().await?, 4);
150 assert_eq!(s.get_last_index().await?, 1000);
151
152 Ok(())
153}