asteroid_mq/protocol/node/raft/
log_storage.rs1use super::TypeConfig;
2use openraft::{storage::RaftLogStorage, LogId, RaftLogReader, RaftTypeConfig, Vote};
3use openraft::{LogState, RaftLogId, StorageError};
4use std::fmt::Debug;
5use std::ops::RangeBounds;
6use std::{collections::BTreeMap, sync::Arc};
7
8#[derive(Clone)]
9pub struct LogStorage<C: RaftTypeConfig = TypeConfig> {
10 inner: Arc<tokio::sync::Mutex<LogStorageInner<C>>>,
11}
12
13impl Default for LogStorage<TypeConfig> {
14 fn default() -> Self {
15 Self {
16 inner: Arc::new(tokio::sync::Mutex::new(LogStorageInner::default())),
17 }
18 }
19}
20#[derive(Debug, Default)]
21pub struct LogStorageInner<C: RaftTypeConfig> {
22 last_purged_log_id: Option<LogId<C::NodeId>>,
24
25 log: BTreeMap<u64, C::Entry>,
27
28 committed: Option<LogId<C::NodeId>>,
30
31 vote: Option<Vote<C::NodeId>>,
33}
34
35impl<C: RaftTypeConfig> LogStorageInner<C> {
36 pub fn new() -> Self {
37 Self {
38 last_purged_log_id: None,
39 log: BTreeMap::new(),
40 committed: None,
41 vote: None,
42 }
43 }
44 async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug>(
45 &mut self,
46 range: RB,
47 ) -> Result<Vec<C::Entry>, StorageError<C::NodeId>>
48 where
49 C::Entry: Clone,
50 {
51 let response = self
52 .log
53 .range(range.clone())
54 .map(|(_, val)| val.clone())
55 .collect::<Vec<_>>();
56 Ok(response)
57 }
58
59 async fn get_log_state(&self) -> Result<openraft::LogState<C>, StorageError<C::NodeId>> {
60 let last = self
61 .log
62 .iter()
63 .next_back()
64 .map(|(_, ent)| ent.get_log_id().clone());
65
66 let last_purged = self.last_purged_log_id.clone();
67
68 let last = match last {
69 None => last_purged.clone(),
70 Some(x) => Some(x),
71 };
72
73 Ok(LogState {
74 last_purged_log_id: last_purged,
75 last_log_id: last,
76 })
77 }
78
79 async fn save_committed(
80 &mut self,
81 committed: Option<LogId<C::NodeId>>,
82 ) -> Result<(), StorageError<C::NodeId>> {
83 self.committed = committed;
84 Ok(())
85 }
86 async fn read_committed(
87 &mut self,
88 ) -> Result<Option<LogId<C::NodeId>>, StorageError<C::NodeId>> {
89 Ok(self.committed.clone())
90 }
91
92 async fn save_vote(&mut self, vote: &Vote<C::NodeId>) -> Result<(), StorageError<C::NodeId>> {
93 self.vote = Some(vote.clone());
94 Ok(())
95 }
96
97 async fn read_vote(&mut self) -> Result<Option<Vote<C::NodeId>>, StorageError<C::NodeId>> {
98 Ok(self.vote.clone())
99 }
100
101 async fn append<I>(
102 &mut self,
103 entries: I,
104 callback: openraft::storage::LogFlushed<C>,
105 ) -> Result<(), StorageError<C::NodeId>>
106 where
107 I: IntoIterator<Item = C::Entry>,
108 {
109 for entry in entries {
111 self.log.insert(entry.get_log_id().index, entry);
112 }
113 callback.log_io_completed(Ok(()));
114
115 Ok(())
116 }
117
118 async fn truncate(&mut self, log_id: LogId<C::NodeId>) -> Result<(), StorageError<C::NodeId>> {
119 let keys = self
120 .log
121 .range(log_id.index..)
122 .map(|(k, _v)| *k)
123 .collect::<Vec<_>>();
124 for key in keys {
125 self.log.remove(&key);
126 }
127
128 Ok(())
129 }
130
131 async fn purge(&mut self, log_id: LogId<C::NodeId>) -> Result<(), StorageError<C::NodeId>> {
132 {
133 let ld = &mut self.last_purged_log_id;
134 assert!(*ld <= Some(log_id.clone()));
135 *ld = Some(log_id.clone());
136 }
137
138 {
139 let keys = self
140 .log
141 .range(..=log_id.index)
142 .map(|(k, _v)| *k)
143 .collect::<Vec<_>>();
144 for key in keys {
145 self.log.remove(&key);
146 }
147 }
148
149 Ok(())
150 }
151}
152
153impl RaftLogReader<TypeConfig> for LogStorage<TypeConfig> {
154 async fn try_get_log_entries<
155 RB: std::ops::RangeBounds<u64> + Clone + std::fmt::Debug + openraft::OptionalSend,
156 >(
157 &mut self,
158 range: RB,
159 ) -> Result<
160 Vec<<TypeConfig as openraft::RaftTypeConfig>::Entry>,
161 openraft::StorageError<<TypeConfig as openraft::RaftTypeConfig>::NodeId>,
162 > {
163 self.inner.lock().await.try_get_log_entries(range).await
164 }
165}
166
167impl RaftLogStorage<TypeConfig> for LogStorage<TypeConfig> {
168 type LogReader = Self;
169 async fn append<I>(
170 &mut self,
171 entries: I,
172 callback: openraft::storage::LogFlushed<TypeConfig>,
173 ) -> Result<(), openraft::StorageError<<TypeConfig as openraft::RaftTypeConfig>::NodeId>>
174 where
175 I: IntoIterator<Item = <TypeConfig as openraft::RaftTypeConfig>::Entry>
176 + openraft::OptionalSend,
177 I::IntoIter: openraft::OptionalSend,
178 {
179 self.inner.lock().await.append(entries, callback).await
180 }
181 async fn get_log_reader(&mut self) -> Self::LogReader {
182 self.clone()
183 }
184 async fn get_log_state(
185 &mut self,
186 ) -> Result<
187 openraft::LogState<TypeConfig>,
188 openraft::StorageError<<TypeConfig as openraft::RaftTypeConfig>::NodeId>,
189 > {
190 self.inner.lock().await.get_log_state().await
191 }
192 async fn purge(
193 &mut self,
194 log_id: openraft::LogId<<TypeConfig as openraft::RaftTypeConfig>::NodeId>,
195 ) -> Result<(), openraft::StorageError<<TypeConfig as openraft::RaftTypeConfig>::NodeId>> {
196 self.inner.lock().await.purge(log_id).await
197 }
198 async fn read_committed(
199 &mut self,
200 ) -> Result<
201 Option<openraft::LogId<<TypeConfig as openraft::RaftTypeConfig>::NodeId>>,
202 openraft::StorageError<<TypeConfig as openraft::RaftTypeConfig>::NodeId>,
203 > {
204 self.inner.lock().await.read_committed().await
205 }
206 async fn save_committed(
207 &mut self,
208 committed: Option<openraft::LogId<<TypeConfig as openraft::RaftTypeConfig>::NodeId>>,
209 ) -> Result<(), openraft::StorageError<<TypeConfig as openraft::RaftTypeConfig>::NodeId>> {
210 self.inner.lock().await.save_committed(committed).await
211 }
212 async fn read_vote(
213 &mut self,
214 ) -> Result<
215 Option<openraft::Vote<<TypeConfig as openraft::RaftTypeConfig>::NodeId>>,
216 openraft::StorageError<<TypeConfig as openraft::RaftTypeConfig>::NodeId>,
217 > {
218 self.inner.lock().await.read_vote().await
219 }
220 async fn save_vote(
221 &mut self,
222 vote: &openraft::Vote<<TypeConfig as openraft::RaftTypeConfig>::NodeId>,
223 ) -> Result<(), openraft::StorageError<<TypeConfig as openraft::RaftTypeConfig>::NodeId>> {
224 self.inner.lock().await.save_vote(vote).await
225 }
226 async fn truncate(
227 &mut self,
228 log_id: openraft::LogId<<TypeConfig as openraft::RaftTypeConfig>::NodeId>,
229 ) -> Result<(), openraft::StorageError<<TypeConfig as openraft::RaftTypeConfig>::NodeId>> {
230 self.inner.lock().await.truncate(log_id).await
231 }
232}