random_access_memory/
lib.rs

1#![forbid(unsafe_code, missing_docs)]
2#![cfg_attr(test, deny(warnings))]
3#![doc(test(attr(deny(warnings))))]
4//! # Continuously read and write to memory using random offsets and lengths
5//! [RandomAccessMemory] is a complete implementation of [random-access-storage](https://docs.rs/random-access-storage)
6//! for in-memory storage.
7//!
8//! See also [random-access-disk](https://docs.rs/random-access-disk) for on-disk storage
9//! that can be swapped with this.
10//!
11//! ## Examples
12//!
13//! Reading, writing, deleting and truncating:
14//!
15//! ```
16//! # async_std::task::block_on(async {
17//! use random_access_storage::RandomAccess;
18//! use random_access_memory::RandomAccessMemory;
19//!
20//! let mut storage = RandomAccessMemory::default();
21//! storage.write(0, b"hello").await.unwrap();
22//! storage.write(5, b" world").await.unwrap();
23//! assert_eq!(storage.read(0, 11).await.unwrap(), b"hello world");
24//! assert_eq!(storage.len().await.unwrap(), 11);
25//! storage.del(5, 2).await.unwrap();
26//! assert_eq!(storage.read(5, 2).await.unwrap(), [0, 0]);
27//! assert_eq!(storage.len().await.unwrap(), 11);
28//! storage.truncate(2).await.unwrap();
29//! assert_eq!(storage.len().await.unwrap(), 2);
30//! storage.truncate(5).await.unwrap();
31//! assert_eq!(storage.len().await.unwrap(), 5);
32//! assert_eq!(storage.read(0, 5).await.unwrap(), [b'h', b'e', 0, 0, 0]);
33//! # })
34//! ```
35//!
36//! In order to get benefits from the swappable interface, you will
37//! in most cases want to use generic functions for storage manipulation:
38//!
39//! ```
40//! # async_std::task::block_on(async {
41//! use random_access_storage::RandomAccess;
42//! use random_access_memory::RandomAccessMemory;
43//! use std::fmt::Debug;
44//!
45//! let mut storage = RandomAccessMemory::default();
46//! write_hello_world(&mut storage).await;
47//! assert_eq!(read_hello_world(&mut storage).await, b"hello world");
48//!
49//! /// Write with swappable storage
50//! async fn write_hello_world<T>(storage: &mut T)
51//! where T: RandomAccess + Debug + Send,
52//! {
53//!   storage.write(0, b"hello").await.unwrap();
54//!   storage.write(5, b" world").await.unwrap();
55//! }
56//!
57//! /// Read with swappable storage
58//! async fn read_hello_world<T>(storage: &mut T) -> Vec<u8>
59//! where T: RandomAccess + Debug + Send,
60//! {
61//!   storage.read(0, 11).await.unwrap()
62//! }
63//! # })
64//! ```
65
66pub use intmap::IntMap;
67
68use random_access_storage::{RandomAccess, RandomAccessError};
69use std::cmp;
70
71/// In-memory storage for random access
72#[derive(Debug)]
73pub struct RandomAccessMemory {
74  /// Length of each buffer
75  page_size: usize,
76
77  /// Allocated memory
78  buffers: IntMap<Vec<u8>>,
79
80  /// Total length of the data
81  length: u64,
82}
83
84impl Default for RandomAccessMemory {
85  /// Create a new instance with a 1mb page size.
86  fn default() -> Self {
87    RandomAccessMemory::new(1024 * 1024)
88  }
89}
90
91#[allow(clippy::needless_range_loop)]
92impl RandomAccessMemory {
93  /// Create a new instance with `page_size` in bytes.
94  pub fn new(page_size: usize) -> Self {
95    RandomAccessMemory::with_buffers(page_size, IntMap::new())
96  }
97
98  /// Create a new instance with `page_size` in bytes, but pass the initial buffers to the constructor.
99  pub fn with_buffers(page_size: usize, buffers: IntMap<Vec<u8>>) -> Self {
100    RandomAccessMemory {
101      page_size,
102      buffers,
103      length: 0,
104    }
105  }
106
107  /// Returns the page number and index within that page for a given offset.
108  /// If `exclusive_end` is true, when hitting the exact border of two pages
109  /// gives the previous page and page size as index.
110  fn page_num_and_index(
111    &self,
112    offset: u64,
113    exclusive_end: bool,
114  ) -> (usize, usize) {
115    let page_num = (offset / (self.page_size as u64)) as usize;
116    let page_index = (offset % (self.page_size as u64)) as usize;
117    if page_index == 0 && exclusive_end {
118      (if page_num > 0 { page_num - 1 } else { 0 }, self.page_size)
119    } else {
120      (page_num, page_index)
121    }
122  }
123
124  /// Zero given range
125  fn zero(&mut self, offset: u64, length: u64) {
126    let (first_page_num, first_page_start) =
127      self.page_num_and_index(offset, false);
128    let (last_page_num, last_page_end) =
129      self.page_num_and_index(offset + length, true);
130
131    // Check if we need to zero bytes in the first page
132    if first_page_start > 0
133      || (first_page_num == last_page_num && last_page_end > 0)
134    {
135      if let Some(page) = self.buffers.get_mut(first_page_num as u64) {
136        // Need to zero part of the first page
137        let begin_page_end = first_page_start
138          + cmp::min(length as usize, self.page_size - first_page_start);
139        for index in first_page_start..begin_page_end {
140          page[index] = 0;
141        }
142      }
143    }
144
145    // Delete intermediate pages
146    if last_page_num > first_page_num + 1
147      || (first_page_start == 0 && last_page_num == first_page_num + 1)
148    {
149      let first_page_to_drop = if first_page_start == 0 {
150        first_page_num
151      } else {
152        first_page_num + 1
153      };
154
155      for index in first_page_to_drop..last_page_num {
156        self.buffers.remove(index as u64);
157      }
158    }
159
160    // Finally zero the last page
161    if last_page_num > first_page_num && last_page_end > 0 {
162      if let Some(page) = self.buffers.get_mut(last_page_num as u64) {
163        // Need to zero part of the final page
164        for index in 0..last_page_end {
165          page[index] = 0;
166        }
167      }
168    }
169  }
170}
171
172#[async_trait::async_trait]
173impl RandomAccess for RandomAccessMemory {
174  async fn write(
175    &mut self,
176    offset: u64,
177    data: &[u8],
178  ) -> Result<(), RandomAccessError> {
179    let new_len = offset + data.len() as u64;
180    if new_len > self.length {
181      self.length = new_len;
182    }
183
184    let mut page_num = (offset / self.page_size as u64) as usize;
185    let mut page_cursor =
186      (offset - (page_num * self.page_size) as u64) as usize;
187    let mut data_cursor = 0;
188
189    // Iterate over data, write to buffers. Subslice if the data is bigger than
190    // what we can write in a single go.
191    while data_cursor < data.len() {
192      let data_bound = data.len() - data_cursor;
193      let upper_bound = cmp::min(self.page_size, page_cursor + data_bound);
194      let range = page_cursor..upper_bound;
195      let range_len = (page_cursor..upper_bound).len();
196
197      // Allocate buffer if needed. Either append a new buffer to the end, or
198      // set a buffer in the center.
199      if self.buffers.get(page_num as u64).is_none() {
200        let buf = vec![0; self.page_size];
201        self.buffers.insert(page_num as u64, buf);
202      }
203
204      // Copy data from the vec slice.
205      // TODO: use a batch operation such as `.copy_from_slice()` so it can be
206      // optimized.
207      let buffer = &mut self.buffers.get_mut(page_num as u64).unwrap();
208      for (index, buf_index) in range.enumerate() {
209        buffer[buf_index] = data[data_cursor + index];
210      }
211
212      page_num += 1;
213      page_cursor = 0;
214      data_cursor += range_len;
215    }
216
217    Ok(())
218  }
219
220  async fn sync_all(&mut self) -> Result<(), RandomAccessError> {
221    Ok(())
222  }
223
224  async fn read(
225    &mut self,
226    offset: u64,
227    length: u64,
228  ) -> Result<Vec<u8>, RandomAccessError> {
229    if (offset + length) > self.length {
230      return Err(RandomAccessError::OutOfBounds {
231        offset,
232        end: Some(offset + length),
233        length: self.length,
234      });
235    };
236
237    let mut page_num = (offset / self.page_size as u64) as usize;
238    let mut page_cursor =
239      (offset - (page_num * self.page_size) as u64) as usize;
240
241    let mut res_buf = vec![0; length as usize];
242    let mut res_cursor = 0; // Keep track we read the right amount of bytes.
243    let res_capacity = length;
244
245    while res_cursor < res_capacity {
246      let res_bound = res_capacity - res_cursor;
247      let page_bound = self.page_size - page_cursor;
248      let relative_bound = cmp::min(res_bound, page_bound as u64);
249      let upper_bound = page_cursor + relative_bound as usize;
250      let range = page_cursor..upper_bound;
251
252      // Fill until either we're done reading the page, or we're done
253      // filling the buffer. Whichever arrives sooner.
254      match self.buffers.get(page_num as u64) {
255        Some(buf) => {
256          for (index, buf_index) in range.enumerate() {
257            res_buf[res_cursor as usize + index] = buf[buf_index];
258          }
259        }
260        None => {
261          for (index, _) in range.enumerate() {
262            res_buf[res_cursor as usize + index] = 0;
263          }
264        }
265      }
266
267      res_cursor += relative_bound;
268      page_num += 1;
269      page_cursor = 0;
270    }
271
272    Ok(res_buf)
273  }
274
275  async fn del(
276    &mut self,
277    offset: u64,
278    length: u64,
279  ) -> Result<(), RandomAccessError> {
280    if offset > self.length {
281      return Err(RandomAccessError::OutOfBounds {
282        offset,
283        end: None,
284        length: self.length,
285      });
286    };
287
288    if length == 0 {
289      // No-op
290      return Ok(());
291    }
292
293    // Delete is truncate if up to the current length or more is deleted
294    if offset + length >= self.length {
295      return self.truncate(offset).await;
296    }
297
298    // Deleting means zeroing
299    self.zero(offset, length);
300    Ok(())
301  }
302
303  #[allow(clippy::comparison_chain)]
304  async fn truncate(&mut self, length: u64) -> Result<(), RandomAccessError> {
305    let (current_last_page_num, _) = self.page_num_and_index(self.length, true);
306
307    if self.length < length {
308      let truncate_page_num = (length / self.page_size as u64) as usize;
309      // Remove all of the pages between the old length and this newer
310      // length that might have been left behind.
311      for index in current_last_page_num + 1..truncate_page_num + 1 {
312        self.buffers.remove(index as u64);
313      }
314    } else if self.length > length {
315      let delete_length =
316        ((current_last_page_num + 1) * self.page_size) - length as usize;
317      // Make sure to zero the remainder to not leave anything but
318      // zeros lying around.
319      self.zero(length, delete_length as u64);
320    }
321
322    // Set new length
323    self.length = length;
324
325    Ok(())
326  }
327
328  async fn len(&mut self) -> Result<u64, RandomAccessError> {
329    Ok(self.length)
330  }
331
332  async fn is_empty(&mut self) -> Result<bool, RandomAccessError> {
333    Ok(self.length == 0)
334  }
335}