d_engine/storage/
storage_engine_test.rs1use std::ops::RangeInclusive;
2use std::sync::Arc;
3
4use prost::Message;
5use tonic::async_trait;
6
7use crate::proto::client::write_command::Insert;
8use crate::proto::common::Entry;
9use crate::proto::common::LogId;
10use crate::proto::election::VotedFor;
11use crate::storage::StorageEngine;
12use crate::Error;
13use crate::HardState;
14use crate::LogStore;
15use crate::MetaStore;
16
17pub struct StorageEngineTestSuite;
24
25#[async_trait]
27pub trait StorageEngineBuilder: Send + Sync {
28 type Engine: StorageEngine + Send + Sync + 'static;
30
31 async fn build(&self) -> Result<Arc<Self::Engine>, Error>;
33
34 async fn cleanup(&self) -> Result<(), Error>;
36}
37
38impl StorageEngineTestSuite {
39 pub async fn run_all_tests<B: StorageEngineBuilder>(builder: B) -> Result<(), Error> {
41 Self::test_empty_storage(builder.build().await?).await?;
42 Self::test_single_entry_persistence(builder.build().await?).await?;
43 Self::test_batch_persistence(builder.build().await?).await?;
44 Self::test_purge_logs(builder.build().await?).await?;
45 Self::test_truncation(builder.build().await?).await?;
46 Self::test_reset_operation(builder.build().await?).await?;
47 Self::test_edge_cases(builder.build().await?).await?;
48 Self::test_hard_state_persistence(builder.build().await?).await?;
49 Self::test_reset_preserves_meta(builder.build().await?).await?;
50 Self::test_flush_persists_all_data(builder.build().await?).await?;
51
52 builder.cleanup().await?;
53 Ok(())
54 }
55
56 async fn test_empty_storage<E>(engine: Arc<E>) -> Result<(), Error>
58 where
59 E: StorageEngine + Send + Sync + 'static,
60 E::LogStore: Send + Sync,
61 E::MetaStore: Send + Sync,
62 {
63 let log_store = engine.log_store();
64
65 assert_eq!(log_store.last_index(), 0);
66 assert!(log_store.entry(1).await?.is_none());
67 assert!(log_store.get_entries(1..=5)?.is_empty());
68
69 Ok(())
70 }
71
72 async fn test_single_entry_persistence<E>(engine: Arc<E>) -> Result<(), Error>
74 where
75 E: StorageEngine + Send + Sync + 'static,
76 E::LogStore: Send + Sync,
77 E::MetaStore: Send + Sync,
78 {
79 let log_store = engine.log_store();
80 let entries = create_test_entries(1..=1);
81
82 log_store.persist_entries(entries.clone()).await?;
84 assert_eq!(log_store.last_index(), 1);
85 assert_eq!(log_store.entry(1).await?.unwrap(), entries[0]);
86 assert_eq!(log_store.get_entries(1..=1)?, entries);
87
88 Ok(())
89 }
90
91 async fn test_batch_persistence<E>(engine: Arc<E>) -> Result<(), Error>
93 where
94 E: StorageEngine + Send + Sync + 'static,
95 E::LogStore: Send + Sync,
96 E::MetaStore: Send + Sync,
97 {
98 let log_store = engine.log_store();
99 let entries = create_test_entries(1..=100);
100
101 log_store.persist_entries(entries.clone()).await?;
102
103 assert_eq!(log_store.last_index(), 100);
105
106 assert_eq!(log_store.entry(1).await?.unwrap(), entries[0]);
108 assert_eq!(log_store.entry(50).await?.unwrap(), entries[49]);
109 assert_eq!(log_store.entry(100).await?.unwrap(), entries[99]);
110
111 let range = log_store.get_entries(25..=75)?;
113 assert_eq!(range.len(), 51);
114 assert_eq!(range[0], entries[24]);
115 assert_eq!(range[50], entries[74]);
116
117 Ok(())
118 }
119
120 async fn test_purge_logs<E>(engine: Arc<E>) -> Result<(), Error>
122 where
123 E: StorageEngine + Send + Sync + 'static,
124 E::LogStore: Send + Sync,
125 E::MetaStore: Send + Sync,
126 {
127 let log_store = engine.log_store();
128 log_store.persist_entries(create_test_entries(1..=100)).await?;
129
130 log_store
132 .purge(LogId {
133 index: 50,
134 term: 50,
135 })
136 .await?;
137
138 assert!(log_store.entry(1).await?.is_none());
139 assert!(log_store.entry(50).await?.is_none());
140 assert!(log_store.entry(51).await?.is_some());
141
142 Ok(())
143 }
144
145 async fn test_truncation<E>(engine: Arc<E>) -> Result<(), Error>
147 where
148 E: StorageEngine + Send + Sync + 'static,
149 E::LogStore: Send + Sync,
150 E::MetaStore: Send + Sync,
151 {
152 let log_store = engine.log_store();
153 log_store.reset().await?;
154 log_store.persist_entries(create_test_entries(1..=100)).await?;
155 log_store.flush()?;
157
158 log_store.truncate(76).await?;
160 log_store.flush()?;
162
163 assert_eq!(log_store.last_index(), 75);
164 assert!(log_store.entry(76).await?.is_none());
165 assert!(log_store.entry(100).await?.is_none());
166 assert!(log_store.entry(75).await?.is_some());
167
168 Ok(())
169 }
170
171 async fn test_reset_operation<E>(engine: Arc<E>) -> Result<(), Error>
173 where
174 E: StorageEngine + Send + Sync + 'static,
175 E::LogStore: Send + Sync,
176 E::MetaStore: Send + Sync,
177 {
178 let log_store = engine.log_store();
179 log_store.persist_entries(create_test_entries(1..=50)).await?;
180
181 log_store.reset().await?;
182
183 assert_eq!(log_store.last_index(), 0);
184 assert!(log_store.entry(1).await?.is_none());
185
186 Ok(())
187 }
188
189 async fn test_edge_cases<E>(engine: Arc<E>) -> Result<(), Error>
191 where
192 E: StorageEngine + Send + Sync + 'static,
193 E::LogStore: Send + Sync,
194 E::MetaStore: Send + Sync,
195 {
196 let log_store = engine.log_store();
197
198 log_store.persist_entries(vec![]).await?;
200
201 assert!(log_store.get_entries(100..=200)?.is_empty());
203
204 Ok(())
205 }
206
207 async fn test_hard_state_persistence<E>(engine: Arc<E>) -> Result<(), Error>
209 where
210 E: StorageEngine + Send + Sync + 'static,
211 E::LogStore: Send + Sync,
212 E::MetaStore: Send + Sync,
213 {
214 let meta_store = engine.meta_store();
215 let hard_state = create_test_hard_state(5, Some((10, 4)));
216
217 meta_store.save_hard_state(&hard_state)?;
219 let loaded = meta_store.load_hard_state()?.unwrap();
220 assert_eq!(loaded.current_term, 5);
221
222 Ok(())
223 }
224
225 async fn test_reset_preserves_meta<E>(engine: Arc<E>) -> Result<(), Error>
227 where
228 E: StorageEngine + Send + Sync + 'static,
229 E::LogStore: Send + Sync,
230 E::MetaStore: Send + Sync,
231 {
232 let log_store = engine.log_store();
233 let meta_store = engine.meta_store();
234 let hard_state = create_test_hard_state(3, Some((5, 4)));
235
236 meta_store.save_hard_state(&hard_state)?;
237
238 log_store.reset().await?;
240
241 let loaded = meta_store.load_hard_state()?;
242 assert!(loaded.is_some());
243 assert_eq!(loaded.unwrap().current_term, 3);
244
245 Ok(())
246 }
247
248 async fn test_flush_persists_all_data<E>(engine: Arc<E>) -> Result<(), Error>
250 where
251 E: StorageEngine + Send + Sync + 'static,
252 E::LogStore: Send + Sync,
253 E::MetaStore: Send + Sync,
254 {
255 let log_store = engine.log_store();
256 let meta_store = engine.meta_store();
257
258 log_store.persist_entries(create_test_entries(1..=5)).await?;
260 meta_store.save_hard_state(&create_test_hard_state(2, Some((1, 2))))?;
261
262 log_store.flush()?;
264 meta_store.flush()?;
265
266 Ok(())
267 }
268}
269
270fn create_test_entries(range: RangeInclusive<u64>) -> Vec<Entry> {
272 range
273 .map(|i| Entry {
274 index: i,
275 term: i,
276 payload: Some(create_test_command_payload(i)),
277 })
278 .collect()
279}
280
281fn create_test_command_payload(index: u64) -> crate::proto::common::EntryPayload {
283 let key = format!("key_{index}").into_bytes();
285 let value = format!("value_{index}").into_bytes();
286
287 let insert = Insert { key, value };
288 let operation = crate::proto::client::write_command::Operation::Insert(insert);
289 let write_cmd = crate::proto::client::WriteCommand {
290 operation: Some(operation),
291 };
292
293 crate::proto::common::EntryPayload {
294 payload: Some(crate::proto::common::entry_payload::Payload::Command(
295 write_cmd.encode_to_vec(),
296 )),
297 }
298}
299
300fn create_test_hard_state(
302 current_term: u64,
303 voted_for: Option<(u32, u64)>,
304) -> HardState {
305 let voted_for = voted_for.map(|(id, term)| VotedFor {
306 voted_for_id: id,
307 voted_for_term: term,
308 });
309
310 HardState {
311 current_term,
312 voted_for,
313 }
314}