Skip to main content

random_access_memory/
lib.rs

1#![forbid(unsafe_code, missing_docs)]
2#![cfg_attr(test, deny(warnings))]
3#![doc(test(attr(deny(warnings))))]
4//! # Continuously read and write to memory using random offsets and lengths
5//! [RandomAccessMemory] is a complete implementation of [random-access-storage](https://docs.rs/random-access-storage)
6//! for in-memory storage.
7//!
8//! See also [random-access-disk](https://docs.rs/random-access-disk) for on-disk storage
9//! that can be swapped with this.
10//!
11//! ## Examples
12//!
13//! Reading, writing, deleting and truncating:
14//!
15//! ```
16//! # async_std::task::block_on(async {
17//! use random_access_memory::RandomAccessMemory;
18//! use random_access_storage::RandomAccess;
19//!
20//! let storage = RandomAccessMemory::default();
21//! storage.write(0, b"hello").await.unwrap();
22//! storage.write(5, b" world").await.unwrap();
23//! assert_eq!(storage.read(0, 11).await.unwrap(), b"hello world");
24//! assert_eq!(storage.len(), 11);
25//! storage.del(5, 2).await.unwrap();
26//! assert_eq!(storage.read(5, 2).await.unwrap(), [0, 0]);
27//! assert_eq!(storage.len(), 11);
28//! storage.truncate(2).await.unwrap();
29//! assert_eq!(storage.len(), 2);
30//! storage.truncate(5).await.unwrap();
31//! assert_eq!(storage.len(), 5);
32//! assert_eq!(storage.read(0, 5).await.unwrap(), [b'h', b'e', 0, 0, 0]);
33//! # })
34//! ```
35//!
36//! In order to get benefits from the swappable interface, you will
37//! in most cases want to use generic functions for storage manipulation:
38//!
39//! ```
40//! # async_std::task::block_on(async {
41//! use random_access_memory::RandomAccessMemory;
42//! use random_access_storage::RandomAccess;
43//! use std::fmt::Debug;
44//!
45//! let storage = RandomAccessMemory::default();
46//! write_hello_world(&storage).await;
47//! assert_eq!(read_hello_world(&storage).await, b"hello world");
48//!
49//! /// Write with swappable storage
50//! async fn write_hello_world<T>(storage: &T)
51//! where
52//!     T: RandomAccess + Debug + Send,
53//! {
54//!     storage.write(0, b"hello").await.unwrap();
55//!     storage.write(5, b" world").await.unwrap();
56//! }
57//!
58//! /// Read with swappable storage
59//! async fn read_hello_world<T>(storage: &T) -> Vec<u8>
60//! where
61//!     T: RandomAccess + Debug + Send,
62//! {
63//!     storage.read(0, 11).await.unwrap()
64//! }
65//! # })
66//! ```
67
68pub use intmap::IntMap;
69
70use random_access_storage::{BoxFuture, RandomAccess, RandomAccessError};
71use std::{
72    cmp,
73    sync::{Arc, Mutex},
74};
75
76/// Internal mutable state behind [RandomAccessMemory].
77#[derive(Debug)]
78struct MemoryInner {
79    page_size: usize,
80    buffers: IntMap<Vec<u8>>,
81    length: u64,
82}
83
84#[allow(clippy::needless_range_loop)]
85impl MemoryInner {
86    /// Returns the page number and index within that page for a given offset.
87    /// If `exclusive_end` is true, when hitting the exact border of two pages
88    /// gives the previous page and page size as index.
89    fn page_num_and_index(&self, offset: u64, exclusive_end: bool) -> (usize, usize) {
90        let page_num = (offset / (self.page_size as u64)) as usize;
91        let page_index = (offset % (self.page_size as u64)) as usize;
92        if page_index == 0 && exclusive_end {
93            (if page_num > 0 { page_num - 1 } else { 0 }, self.page_size)
94        } else {
95            (page_num, page_index)
96        }
97    }
98
99    /// Zero given range
100    fn zero(&mut self, offset: u64, length: u64) {
101        let (first_page_num, first_page_start) = self.page_num_and_index(offset, false);
102        let (last_page_num, last_page_end) = self.page_num_and_index(offset + length, true);
103
104        // Check if we need to zero bytes in the first page
105        if (first_page_start > 0 || (first_page_num == last_page_num && last_page_end > 0))
106            && let Some(page) = self.buffers.get_mut(first_page_num as u64)
107        {
108            // Need to zero part of the first page
109            let begin_page_end =
110                first_page_start + cmp::min(length as usize, self.page_size - first_page_start);
111            for index in first_page_start..begin_page_end {
112                page[index] = 0;
113            }
114        }
115
116        // Delete intermediate pages
117        if last_page_num > first_page_num + 1
118            || (first_page_start == 0 && last_page_num == first_page_num + 1)
119        {
120            let first_page_to_drop = if first_page_start == 0 {
121                first_page_num
122            } else {
123                first_page_num + 1
124            };
125
126            for index in first_page_to_drop..last_page_num {
127                self.buffers.remove(index as u64);
128            }
129        }
130
131        // Finally zero the last page
132        if last_page_num > first_page_num
133            && last_page_end > 0
134            && let Some(page) = self.buffers.get_mut(last_page_num as u64)
135        {
136            // Need to zero part of the final page
137            for index in 0..last_page_end {
138                page[index] = 0;
139            }
140        }
141    }
142
143    fn do_read(&self, offset: u64, length: u64) -> Result<Vec<u8>, RandomAccessError> {
144        if (offset + length) > self.length {
145            return Err(RandomAccessError::OutOfBounds {
146                offset,
147                end: Some(offset + length),
148                length: self.length,
149            });
150        };
151
152        let mut page_num = (offset / self.page_size as u64) as usize;
153        let mut page_cursor = (offset - (page_num * self.page_size) as u64) as usize;
154
155        let mut res_buf = vec![0; length as usize];
156        let mut res_cursor = 0; // Keep track we read the right amount of bytes.
157        let res_capacity = length;
158
159        while res_cursor < res_capacity {
160            let res_bound = res_capacity - res_cursor;
161            let page_bound = self.page_size - page_cursor;
162            let relative_bound = cmp::min(res_bound, page_bound as u64);
163            let upper_bound = page_cursor + relative_bound as usize;
164            let range = page_cursor..upper_bound;
165
166            // Fill until either we're done reading the page, or we're done
167            // filling the buffer. Whichever arrives sooner.
168            match self.buffers.get(page_num as u64) {
169                Some(buf) => {
170                    for (index, buf_index) in range.enumerate() {
171                        res_buf[res_cursor as usize + index] = buf[buf_index];
172                    }
173                }
174                None => {
175                    for (index, _) in range.enumerate() {
176                        res_buf[res_cursor as usize + index] = 0;
177                    }
178                }
179            }
180
181            res_cursor += relative_bound;
182            page_num += 1;
183            page_cursor = 0;
184        }
185
186        Ok(res_buf)
187    }
188
189    fn do_write(&mut self, offset: u64, data: &[u8]) -> Result<(), RandomAccessError> {
190        let new_len = offset + data.len() as u64;
191        if new_len > self.length {
192            self.length = new_len;
193        }
194
195        let mut page_num = (offset / self.page_size as u64) as usize;
196        let mut page_cursor = (offset - (page_num * self.page_size) as u64) as usize;
197        let mut data_cursor = 0;
198
199        // Iterate over data, write to buffers. Subslice if the data is bigger than
200        // what we can write in a single go.
201        while data_cursor < data.len() {
202            let data_bound = data.len() - data_cursor;
203            let upper_bound = cmp::min(self.page_size, page_cursor + data_bound);
204            let range = page_cursor..upper_bound;
205            let range_len = (page_cursor..upper_bound).len();
206
207            // Allocate buffer if needed. Either append a new buffer to the end, or
208            // set a buffer in the center.
209            if self.buffers.get(page_num as u64).is_none() {
210                let buf = vec![0; self.page_size];
211                self.buffers.insert(page_num as u64, buf);
212            }
213
214            // Copy data from the vec slice.
215            // TODO: use a batch operation such as `.copy_from_slice()` so it can be
216            // optimized.
217            let buffer = &mut self.buffers.get_mut(page_num as u64).unwrap();
218            for (index, buf_index) in range.enumerate() {
219                buffer[buf_index] = data[data_cursor + index];
220            }
221
222            page_num += 1;
223            page_cursor = 0;
224            data_cursor += range_len;
225        }
226
227        Ok(())
228    }
229
230    fn do_del(&mut self, offset: u64, length: u64) -> Result<(), RandomAccessError> {
231        if offset > self.length {
232            return Err(RandomAccessError::OutOfBounds {
233                offset,
234                end: None,
235                length: self.length,
236            });
237        };
238
239        if length == 0 {
240            // No-op
241            return Ok(());
242        }
243
244        // Delete is truncate if up to the current length or more is deleted
245        if offset + length >= self.length {
246            return self.do_truncate(offset);
247        }
248
249        // Deleting means zeroing
250        self.zero(offset, length);
251        Ok(())
252    }
253
254    #[allow(clippy::comparison_chain)]
255    fn do_truncate(&mut self, length: u64) -> Result<(), RandomAccessError> {
256        let (current_last_page_num, _) = self.page_num_and_index(self.length, true);
257
258        if self.length < length {
259            let truncate_page_num = (length / self.page_size as u64) as usize;
260            // Remove all of the pages between the old length and this newer
261            // length that might have been left behind.
262            for index in current_last_page_num + 1..truncate_page_num + 1 {
263                self.buffers.remove(index as u64);
264            }
265        } else if self.length > length {
266            let delete_length = ((current_last_page_num + 1) * self.page_size) - length as usize;
267            // Make sure to zero the remainder to not leave anything but
268            // zeros lying around.
269            self.zero(length, delete_length as u64);
270        }
271
272        // Set new length
273        self.length = length;
274
275        Ok(())
276    }
277}
278
279/// In-memory storage for random access
280#[derive(Debug, Clone)]
281pub struct RandomAccessMemory {
282    inner: Arc<Mutex<MemoryInner>>,
283}
284
285impl Default for RandomAccessMemory {
286    /// Create a new instance with a 1mb page size.
287    fn default() -> Self {
288        RandomAccessMemory::new(1024 * 1024)
289    }
290}
291
292impl RandomAccessMemory {
293    /// Create a new instance with `page_size` in bytes.
294    pub fn new(page_size: usize) -> Self {
295        RandomAccessMemory::with_buffers(page_size, IntMap::new())
296    }
297
298    /// Create a new instance with `page_size` in bytes, but pass the initial buffers to the constructor.
299    pub fn with_buffers(page_size: usize, buffers: IntMap<Vec<u8>>) -> Self {
300        RandomAccessMemory {
301            inner: Arc::new(Mutex::new(MemoryInner {
302                page_size,
303                buffers,
304                length: 0,
305            })),
306        }
307    }
308}
309
310impl RandomAccess for RandomAccessMemory {
311    fn write(&self, offset: u64, data: &[u8]) -> BoxFuture<Result<(), RandomAccessError>> {
312        let data = data.to_vec();
313        let inner = self.inner.clone();
314        Box::pin(std::future::ready(
315            inner.lock().unwrap().do_write(offset, &data),
316        ))
317    }
318
319    fn read(&self, offset: u64, length: u64) -> BoxFuture<Result<Vec<u8>, RandomAccessError>> {
320        let inner = self.inner.clone();
321        Box::pin(std::future::ready(
322            inner.lock().unwrap().do_read(offset, length),
323        ))
324    }
325
326    fn del(&self, offset: u64, length: u64) -> BoxFuture<Result<(), RandomAccessError>> {
327        Box::pin(std::future::ready(
328            self.inner.lock().unwrap().do_del(offset, length),
329        ))
330    }
331
332    #[allow(clippy::comparison_chain)]
333    fn truncate(&self, length: u64) -> BoxFuture<Result<(), RandomAccessError>> {
334        Box::pin(std::future::ready(
335            self.inner.lock().unwrap().do_truncate(length),
336        ))
337    }
338
339    fn len(&self) -> u64 {
340        self.inner.lock().unwrap().length
341    }
342}