d_engine/storage/
storage_engine_test.rs

1use std::ops::RangeInclusive;
2use std::sync::Arc;
3
4use prost::Message;
5use tonic::async_trait;
6
7use crate::proto::client::write_command::Insert;
8use crate::proto::common::Entry;
9use crate::proto::common::LogId;
10use crate::proto::election::VotedFor;
11use crate::storage::StorageEngine;
12use crate::Error;
13use crate::HardState;
14use crate::LogStore;
15use crate::MetaStore;
16
17/// Test suite for StorageEngine implementations
18///
19/// This suite provides comprehensive tests that can be used to validate
20/// any StorageEngine implementation. Developers should implement the
21/// `StorageEngineBuilder` trait and then call `run_all_storage_engine_tests`
22/// with their builder.
23pub struct StorageEngineTestSuite;
24
25/// Builder trait for creating StorageEngine instances for testing
26#[async_trait]
27pub trait StorageEngineBuilder: Send + Sync {
28    /// The concrete StorageEngine type returned by this builder
29    type Engine: StorageEngine + Send + Sync + 'static;
30
31    /// Create a new StorageEngine instance for testing
32    async fn build(&self) -> Result<Arc<Self::Engine>, Error>;
33
34    /// Clean up any resources after testing
35    async fn cleanup(&self) -> Result<(), Error>;
36}
37
38impl StorageEngineTestSuite {
39    /// Run all storage engine tests
40    pub async fn run_all_tests<B: StorageEngineBuilder>(builder: B) -> Result<(), Error> {
41        Self::test_empty_storage(builder.build().await?).await?;
42        Self::test_single_entry_persistence(builder.build().await?).await?;
43        Self::test_batch_persistence(builder.build().await?).await?;
44        Self::test_purge_logs(builder.build().await?).await?;
45        Self::test_truncation(builder.build().await?).await?;
46        Self::test_reset_operation(builder.build().await?).await?;
47        Self::test_edge_cases(builder.build().await?).await?;
48        Self::test_hard_state_persistence(builder.build().await?).await?;
49        Self::test_reset_preserves_meta(builder.build().await?).await?;
50        Self::test_flush_persists_all_data(builder.build().await?).await?;
51
52        builder.cleanup().await?;
53        Ok(())
54    }
55
56    /// Test empty storage behavior
57    async fn test_empty_storage<E>(engine: Arc<E>) -> Result<(), Error>
58    where
59        E: StorageEngine + Send + Sync + 'static,
60        E::LogStore: Send + Sync,
61        E::MetaStore: Send + Sync,
62    {
63        let log_store = engine.log_store();
64
65        assert_eq!(log_store.last_index(), 0);
66        assert!(log_store.entry(1).await?.is_none());
67        assert!(log_store.get_entries(1..=5)?.is_empty());
68
69        Ok(())
70    }
71
72    /// Test single entry persistence and retrieval
73    async fn test_single_entry_persistence<E>(engine: Arc<E>) -> Result<(), Error>
74    where
75        E: StorageEngine + Send + Sync + 'static,
76        E::LogStore: Send + Sync,
77        E::MetaStore: Send + Sync,
78    {
79        let log_store = engine.log_store();
80        let entries = create_test_entries(1..=1);
81
82        // Persist and retrieve
83        log_store.persist_entries(entries.clone()).await?;
84        assert_eq!(log_store.last_index(), 1);
85        assert_eq!(log_store.entry(1).await?.unwrap(), entries[0]);
86        assert_eq!(log_store.get_entries(1..=1)?, entries);
87
88        Ok(())
89    }
90
91    /// Test batch entry persistence and retrieval
92    async fn test_batch_persistence<E>(engine: Arc<E>) -> Result<(), Error>
93    where
94        E: StorageEngine + Send + Sync + 'static,
95        E::LogStore: Send + Sync,
96        E::MetaStore: Send + Sync,
97    {
98        let log_store = engine.log_store();
99        let entries = create_test_entries(1..=100);
100
101        log_store.persist_entries(entries.clone()).await?;
102
103        // Verify all entries
104        assert_eq!(log_store.last_index(), 100);
105
106        // Spot check random entries
107        assert_eq!(log_store.entry(1).await?.unwrap(), entries[0]);
108        assert_eq!(log_store.entry(50).await?.unwrap(), entries[49]);
109        assert_eq!(log_store.entry(100).await?.unwrap(), entries[99]);
110
111        // Verify range query
112        let range = log_store.get_entries(25..=75)?;
113        assert_eq!(range.len(), 51);
114        assert_eq!(range[0], entries[24]);
115        assert_eq!(range[50], entries[74]);
116
117        Ok(())
118    }
119
120    /// Test log purging functionality
121    async fn test_purge_logs<E>(engine: Arc<E>) -> Result<(), Error>
122    where
123        E: StorageEngine + Send + Sync + 'static,
124        E::LogStore: Send + Sync,
125        E::MetaStore: Send + Sync,
126    {
127        let log_store = engine.log_store();
128        log_store.persist_entries(create_test_entries(1..=100)).await?;
129
130        // Purge first 50 entries
131        log_store
132            .purge(LogId {
133                index: 50,
134                term: 50,
135            })
136            .await?;
137
138        assert!(log_store.entry(1).await?.is_none());
139        assert!(log_store.entry(50).await?.is_none());
140        assert!(log_store.entry(51).await?.is_some());
141
142        Ok(())
143    }
144
145    /// Test log truncation functionality
146    async fn test_truncation<E>(engine: Arc<E>) -> Result<(), Error>
147    where
148        E: StorageEngine + Send + Sync + 'static,
149        E::LogStore: Send + Sync,
150        E::MetaStore: Send + Sync,
151    {
152        let log_store = engine.log_store();
153        log_store.reset().await?;
154        log_store.persist_entries(create_test_entries(1..=100)).await?;
155        // Ensure all operations are flushed to disk before truncation
156        log_store.flush()?;
157
158        // Truncate from index 76 onward
159        log_store.truncate(76).await?;
160        // Ensure truncation is persisted
161        log_store.flush()?;
162
163        assert_eq!(log_store.last_index(), 75);
164        assert!(log_store.entry(76).await?.is_none());
165        assert!(log_store.entry(100).await?.is_none());
166        assert!(log_store.entry(75).await?.is_some());
167
168        Ok(())
169    }
170
171    /// Test storage reset functionality
172    async fn test_reset_operation<E>(engine: Arc<E>) -> Result<(), Error>
173    where
174        E: StorageEngine + Send + Sync + 'static,
175        E::LogStore: Send + Sync,
176        E::MetaStore: Send + Sync,
177    {
178        let log_store = engine.log_store();
179        log_store.persist_entries(create_test_entries(1..=50)).await?;
180
181        log_store.reset().await?;
182
183        assert_eq!(log_store.last_index(), 0);
184        assert!(log_store.entry(1).await?.is_none());
185
186        Ok(())
187    }
188
189    /// Test edge cases and error conditions
190    async fn test_edge_cases<E>(engine: Arc<E>) -> Result<(), Error>
191    where
192        E: StorageEngine + Send + Sync + 'static,
193        E::LogStore: Send + Sync,
194        E::MetaStore: Send + Sync,
195    {
196        let log_store = engine.log_store();
197
198        // Empty persistence
199        log_store.persist_entries(vec![]).await?;
200
201        // Out-of-range access
202        assert!(log_store.get_entries(100..=200)?.is_empty());
203
204        Ok(())
205    }
206
207    /// Test hard state persistence and retrieval
208    async fn test_hard_state_persistence<E>(engine: Arc<E>) -> Result<(), Error>
209    where
210        E: StorageEngine + Send + Sync + 'static,
211        E::LogStore: Send + Sync,
212        E::MetaStore: Send + Sync,
213    {
214        let meta_store = engine.meta_store();
215        let hard_state = create_test_hard_state(5, Some((10, 4)));
216
217        // Save and verify
218        meta_store.save_hard_state(&hard_state)?;
219        let loaded = meta_store.load_hard_state()?.unwrap();
220        assert_eq!(loaded.current_term, 5);
221
222        Ok(())
223    }
224
225    /// Test that reset preserves metadata
226    async fn test_reset_preserves_meta<E>(engine: Arc<E>) -> Result<(), Error>
227    where
228        E: StorageEngine + Send + Sync + 'static,
229        E::LogStore: Send + Sync,
230        E::MetaStore: Send + Sync,
231    {
232        let log_store = engine.log_store();
233        let meta_store = engine.meta_store();
234        let hard_state = create_test_hard_state(3, Some((5, 4)));
235
236        meta_store.save_hard_state(&hard_state)?;
237
238        // Reset should clear logs but keep meta
239        log_store.reset().await?;
240
241        let loaded = meta_store.load_hard_state()?;
242        assert!(loaded.is_some());
243        assert_eq!(loaded.unwrap().current_term, 3);
244
245        Ok(())
246    }
247
248    /// Test that flush persists all data
249    async fn test_flush_persists_all_data<E>(engine: Arc<E>) -> Result<(), Error>
250    where
251        E: StorageEngine + Send + Sync + 'static,
252        E::LogStore: Send + Sync,
253        E::MetaStore: Send + Sync,
254    {
255        let log_store = engine.log_store();
256        let meta_store = engine.meta_store();
257
258        // Write to both stores
259        log_store.persist_entries(create_test_entries(1..=5)).await?;
260        meta_store.save_hard_state(&create_test_hard_state(2, Some((1, 2))))?;
261
262        // Flush both stores
263        log_store.flush()?;
264        meta_store.flush()?;
265
266        Ok(())
267    }
268}
269
270/// Helper function to create test entries
271fn create_test_entries(range: RangeInclusive<u64>) -> Vec<Entry> {
272    range
273        .map(|i| Entry {
274            index: i,
275            term: i,
276            payload: Some(create_test_command_payload(i)),
277        })
278        .collect()
279}
280
281/// Helper function to create test command payload
282fn create_test_command_payload(index: u64) -> crate::proto::common::EntryPayload {
283    // Create a simple insert command
284    let key = format!("key_{index}").into_bytes();
285    let value = format!("value_{index}").into_bytes();
286
287    let insert = Insert { key, value };
288    let operation = crate::proto::client::write_command::Operation::Insert(insert);
289    let write_cmd = crate::proto::client::WriteCommand {
290        operation: Some(operation),
291    };
292
293    crate::proto::common::EntryPayload {
294        payload: Some(crate::proto::common::entry_payload::Payload::Command(
295            write_cmd.encode_to_vec(),
296        )),
297    }
298}
299
300/// Helper function to create test hard state
301fn create_test_hard_state(
302    current_term: u64,
303    voted_for: Option<(u32, u64)>,
304) -> HardState {
305    let voted_for = voted_for.map(|(id, term)| VotedFor {
306        voted_for_id: id,
307        voted_for_term: term,
308    });
309
310    HardState {
311        current_term,
312        voted_for,
313    }
314}