commonware_runtime/storage/
mod.rs

1//! Implementations of the `Storage` trait that can be used by the runtime.
2pub mod audited;
3#[cfg(feature = "iouring-storage")]
4pub mod iouring;
5pub mod memory;
6pub mod metered;
7#[cfg(all(not(target_arch = "wasm32"), not(feature = "iouring-storage")))]
8pub mod tokio;
9
10#[cfg(test)]
11pub(crate) mod tests {
12    use crate::{Blob, Storage};
13
14    /// Runs the full suite of tests on the provided storage implementation.
15    pub(crate) async fn run_storage_tests<S>(storage: S)
16    where
17        S: Storage + Send + Sync + 'static,
18        S::Blob: Send + Sync,
19    {
20        test_open_and_write(&storage).await;
21        test_remove(&storage).await;
22        test_scan(&storage).await;
23        test_concurrent_access(&storage).await;
24        test_large_data(&storage).await;
25        test_overwrite_data(&storage).await;
26        test_read_beyond_bound(&storage).await;
27        test_write_at_large_offset(&storage).await;
28        test_append_data(&storage).await;
29        test_sequential_read_write(&storage).await;
30        test_sequential_chunk_read_write(&storage).await;
31        test_read_empty_blob(&storage).await;
32        test_overlapping_writes(&storage).await;
33        test_truncate_then_open(&storage).await;
34    }
35
36    /// Test opening a blob, writing to it, and reading back the data.
37    async fn test_open_and_write<S>(storage: &S)
38    where
39        S: Storage + Send + Sync,
40        S::Blob: Send + Sync,
41    {
42        let (blob, len) = storage.open("partition", b"test_blob").await.unwrap();
43        assert_eq!(len, 0);
44
45        blob.write_at(Vec::from("hello world"), 0).await.unwrap();
46        let read = blob.read_at(vec![0; 11], 0).await.unwrap();
47
48        assert_eq!(
49            read.as_ref(),
50            b"hello world",
51            "Blob content does not match expected value"
52        );
53    }
54
55    /// Test removing a blob from storage.
56    async fn test_remove<S>(storage: &S)
57    where
58        S: Storage + Send + Sync,
59        S::Blob: Send + Sync,
60    {
61        storage.open("partition", b"test_blob").await.unwrap();
62        storage
63            .remove("partition", Some(b"test_blob"))
64            .await
65            .unwrap();
66
67        let blobs = storage.scan("partition").await.unwrap();
68        assert!(blobs.is_empty(), "Blob was not removed as expected");
69    }
70
71    /// Test scanning a partition for blobs.
72    async fn test_scan<S>(storage: &S)
73    where
74        S: Storage + Send + Sync,
75        S::Blob: Send + Sync,
76    {
77        storage.open("partition", b"blob1").await.unwrap();
78        storage.open("partition", b"blob2").await.unwrap();
79
80        let blobs = storage.scan("partition").await.unwrap();
81        assert_eq!(
82            blobs.len(),
83            2,
84            "Scan did not return the expected number of blobs"
85        );
86        assert!(
87            blobs.contains(&b"blob1".to_vec()),
88            "Blob1 is missing from scan results"
89        );
90        assert!(
91            blobs.contains(&b"blob2".to_vec()),
92            "Blob2 is missing from scan results"
93        );
94    }
95
96    /// Test concurrent access to the same blob.
97    async fn test_concurrent_access<S>(storage: &S)
98    where
99        S: Storage + Send + Sync,
100        S::Blob: Send + Sync,
101    {
102        let (blob, _) = storage.open("partition", b"test_blob").await.unwrap();
103
104        // Initialize blob with data of sufficient length first
105        blob.write_at(b"concurrent write".to_vec(), 0)
106            .await
107            .unwrap();
108
109        // Read and write concurrently
110        let write_task = tokio::spawn({
111            let blob = blob.clone();
112            async move {
113                blob.write_at(b"concurrent write".to_vec(), 0)
114                    .await
115                    .unwrap();
116            }
117        });
118
119        let read_task = tokio::spawn({
120            let blob = blob.clone();
121            async move { blob.read_at(vec![0; 16], 0).await.unwrap() }
122        });
123
124        write_task.await.unwrap();
125        let buffer = read_task.await.unwrap();
126
127        assert_eq!(
128            buffer.as_ref(),
129            b"concurrent write",
130            "Concurrent access failed"
131        );
132    }
133
134    /// Test handling of large data sizes.
135    async fn test_large_data<S>(storage: &S)
136    where
137        S: Storage + Send + Sync,
138        S::Blob: Send + Sync,
139    {
140        let (blob, _) = storage.open("partition", b"large_blob").await.unwrap();
141
142        let large_data = vec![42u8; 10 * 1024 * 1024]; // 10 MB
143        blob.write_at(large_data.clone(), 0).await.unwrap();
144
145        let read = blob.read_at(vec![0; 10 * 1024 * 1024], 0).await.unwrap();
146
147        assert_eq!(read.as_ref(), large_data, "Large data read/write failed");
148    }
149
150    /// Test overwriting data in a blob.
151    async fn test_overwrite_data<S>(storage: &S)
152    where
153        S: Storage + Send + Sync,
154        S::Blob: Send + Sync,
155    {
156        let (blob, _) = storage
157            .open("test_overwrite_data", b"test_blob")
158            .await
159            .unwrap();
160
161        // Write initial data
162        blob.write_at(b"initial data".to_vec(), 0).await.unwrap();
163
164        // Overwrite part of the data
165        blob.write_at(b"overwrite".to_vec(), 8).await.unwrap();
166
167        // Read back the data
168        let read = blob.read_at(vec![0; 17], 0).await.unwrap();
169
170        assert_eq!(
171            read.as_ref(),
172            b"initial overwrite",
173            "Data was not overwritten correctly"
174        );
175    }
176
177    /// Test reading from an offset beyond the written data.
178    async fn test_read_beyond_bound<S>(storage: &S)
179    where
180        S: Storage + Send + Sync,
181        S::Blob: Send + Sync,
182    {
183        let (blob, _) = storage
184            .open("test_read_beyond_written_data", b"test_blob")
185            .await
186            .unwrap();
187
188        // Write some data
189        blob.write_at(b"hello".to_vec(), 0).await.unwrap();
190
191        // Attempt to read beyond the written data
192        let result = blob.read_at(vec![0; 10], 6).await;
193
194        assert!(
195            result.is_err(),
196            "Reading beyond written data should return an error"
197        );
198    }
199
200    /// Test writing data at a large offset.
201    async fn test_write_at_large_offset<S>(storage: &S)
202    where
203        S: Storage + Send + Sync,
204        S::Blob: Send + Sync,
205    {
206        let (blob, _) = storage
207            .open("test_write_at_large_offset", b"test_blob")
208            .await
209            .unwrap();
210
211        // Write data at a large offset
212        blob.write_at(b"offset data".to_vec(), 10_000)
213            .await
214            .unwrap();
215
216        // Read back the data
217        let read = blob.read_at(vec![0; 11], 10_000).await.unwrap();
218        assert_eq!(
219            read.as_ref(),
220            b"offset data",
221            "Data at large offset is incorrect"
222        );
223    }
224
225    /// Test appending data to a blob.
226    async fn test_append_data<S>(storage: &S)
227    where
228        S: Storage + Send + Sync,
229        S::Blob: Send + Sync,
230    {
231        let (blob, _) = storage
232            .open("test_append_data", b"test_blob")
233            .await
234            .unwrap();
235
236        // Write initial data
237        blob.write_at(b"first".to_vec(), 0).await.unwrap();
238
239        // Append data
240        blob.write_at(b"second".to_vec(), 5).await.unwrap();
241
242        // Read back the data
243        let read = blob.read_at(vec![0; 11], 0).await.unwrap();
244        assert_eq!(read.as_ref(), b"firstsecond", "Appended data is incorrect");
245    }
246
247    /// Test reading and writing with interleaved offsets.
248    async fn test_sequential_read_write<S>(storage: &S)
249    where
250        S: Storage + Send + Sync,
251        S::Blob: Send + Sync,
252    {
253        let (blob, _) = storage.open("partition", b"test_blob").await.unwrap();
254
255        // Write data at different offsets
256        blob.write_at(b"first".to_vec(), 0).await.unwrap();
257        blob.write_at(b"second".to_vec(), 10).await.unwrap();
258
259        // Read back the data
260        let read = blob.read_at(vec![0; 5], 0).await.unwrap();
261        assert_eq!(read.as_ref(), b"first", "Data at offset 0 is incorrect");
262
263        let read = blob.read_at(vec![0; 6], 10).await.unwrap();
264        assert_eq!(read.as_ref(), b"second", "Data at offset 10 is incorrect");
265    }
266
267    /// Test writing and reading large data in chunks.
268    async fn test_sequential_chunk_read_write<S>(storage: &S)
269    where
270        S: Storage + Send + Sync,
271        S::Blob: Send + Sync,
272    {
273        let (blob, _) = storage
274            .open("test_large_data_in_chunks", b"large_blob")
275            .await
276            .unwrap();
277
278        let chunk_size = 1024 * 1024; // 1 MB
279        let num_chunks = 10;
280        let data = vec![7u8; chunk_size];
281
282        // Write data in chunks
283        for i in 0..num_chunks {
284            blob.write_at(data.clone(), (i * chunk_size) as u64)
285                .await
286                .unwrap();
287        }
288
289        // Read back the data in chunks
290        let mut read = vec![0u8; chunk_size].into();
291        for i in 0..num_chunks {
292            read = blob.read_at(read, (i * chunk_size) as u64).await.unwrap();
293            assert_eq!(read.as_ref(), data, "Chunk {} is incorrect", i);
294        }
295    }
296
297    /// Test reading from an empty blob.
298    async fn test_read_empty_blob<S>(storage: &S)
299    where
300        S: Storage + Send + Sync,
301        S::Blob: Send + Sync,
302    {
303        let (blob, _) = storage
304            .open("test_read_empty_blob", b"empty_blob")
305            .await
306            .unwrap();
307
308        let result = blob.read_at(vec![0; 1], 0).await;
309        assert!(
310            result.is_err(),
311            "Reading from an empty blob should return an error"
312        );
313    }
314
315    /// Test writing and reading with overlapping writes.
316    async fn test_overlapping_writes<S>(storage: &S)
317    where
318        S: Storage + Send + Sync,
319        S::Blob: Send + Sync,
320    {
321        let (blob, _) = storage
322            .open("test_overlapping_writes", b"test_blob")
323            .await
324            .unwrap();
325
326        // Write overlapping data
327        blob.write_at(b"overlap".to_vec(), 0).await.unwrap();
328        blob.write_at(b"map".to_vec(), 4).await.unwrap();
329
330        // Read back the data
331        let read = blob.read_at(vec![0; 7], 0).await.unwrap();
332        assert_eq!(
333            read.as_ref(),
334            b"overmap",
335            "Overlapping writes are incorrect"
336        );
337    }
338
339    async fn test_truncate_then_open<S>(storage: &S)
340    where
341        S: Storage + Send + Sync,
342        S::Blob: Send + Sync,
343    {
344        {
345            let (blob, _) = storage
346                .open("test_truncate_then_open", b"test_blob")
347                .await
348                .unwrap();
349
350            // Write some data
351            blob.write_at(b"hello world".to_vec(), 0).await.unwrap();
352
353            // Truncate the blob
354            blob.truncate(5).await.unwrap();
355
356            blob.close().await.unwrap();
357        }
358
359        // Reopen the blob
360        let (blob, len) = storage
361            .open("test_truncate_then_open", b"test_blob")
362            .await
363            .unwrap();
364        assert_eq!(len, 5, "Blob length after truncate is incorrect");
365
366        // Read back the data
367        let read = blob.read_at(vec![0; 5], 0).await.unwrap();
368        assert_eq!(read.as_ref(), b"hello", "Truncated data is incorrect");
369    }
370}