#![cfg_attr(feature = "nightly", deny(missing_docs))]
#![cfg_attr(feature = "nightly", feature(external_doc))]
#![cfg_attr(feature = "nightly", doc(include = "../README.md"))]
#![cfg_attr(test, deny(warnings))]
use anyhow::anyhow;
use random_access_storage::RandomAccess;
use std::cmp;
#[derive(Debug)]
pub struct RandomAccessMemory {
page_size: usize,
buffers: Vec<Vec<u8>>,
length: u64,
}
impl RandomAccessMemory {
pub fn new(page_size: usize) -> Self {
RandomAccessMemory {
buffers: Vec::new(),
page_size,
length: 0,
}
}
pub fn default() -> Self {
RandomAccessMemory {
buffers: Vec::new(),
page_size: 1024 * 1024,
length: 0,
}
}
pub fn with_buffers(page_size: usize, buffers: Vec<Vec<u8>>) -> Self {
RandomAccessMemory {
page_size,
buffers,
length: 0,
}
}
}
#[async_trait::async_trait]
impl RandomAccess for RandomAccessMemory {
type Error = Box<dyn std::error::Error + Send + Sync>;
async fn write(
&mut self,
offset: u64,
data: &[u8],
) -> Result<(), Self::Error> {
let new_len = offset + data.len() as u64;
if new_len > self.length {
self.length = new_len;
}
let mut page_num = (offset / self.page_size as u64) as usize;
let mut page_cursor =
(offset - (page_num * self.page_size) as u64) as usize;
let mut data_cursor = 0;
while data_cursor < data.len() {
let data_bound = data.len() - data_cursor;
let upper_bound = cmp::min(self.page_size, page_cursor + data_bound);
let range = page_cursor..upper_bound;
let range_len = (page_cursor as usize..upper_bound as usize).len();
if self.buffers.get(page_num).is_none() {
let buf = vec![0; self.page_size as usize];
if self.buffers.len() < page_num + 1 {
self.buffers.resize(page_num + 1, buf);
} else {
self.buffers[page_num] = buf;
}
}
let buffer = &mut self.buffers[page_num as usize];
for (index, buf_index) in range.enumerate() {
buffer[buf_index as usize] = data[data_cursor + index];
}
page_num += 1;
page_cursor = 0;
data_cursor += range_len;
}
Ok(())
}
async fn sync_all(&mut self) -> Result<(), Self::Error> {
Ok(())
}
async fn read(
&mut self,
offset: u64,
length: u64,
) -> Result<Vec<u8>, Self::Error> {
if (offset + length) > self.length {
return Err(
anyhow!(
"Read bounds exceeded. {} < {}..{}",
self.length,
offset,
offset + length
)
.into(),
);
};
let mut page_num = (offset / self.page_size as u64) as usize;
let mut page_cursor =
(offset - (page_num * self.page_size) as u64) as usize;
let mut res_buf = vec![0; length as usize];
let mut res_cursor = 0; let res_capacity = length;
while res_cursor < res_capacity {
let res_bound = res_capacity - res_cursor;
let page_bound = self.page_size - page_cursor;
let relative_bound = cmp::min(res_bound, page_bound as u64);
let upper_bound = page_cursor + relative_bound as usize;
let range = page_cursor..upper_bound;
match self.buffers.get(page_num as usize) {
Some(buf) => {
for (index, buf_index) in range.enumerate() {
res_buf[res_cursor as usize + index] = buf[buf_index as usize];
}
}
None => {
for (index, _) in range.enumerate() {
res_buf[res_cursor as usize + index] = 0;
}
}
}
res_cursor += relative_bound;
page_num += 1;
page_cursor = 0;
}
Ok(res_buf)
}
async fn read_to_writer(
&mut self,
_offset: u64,
_length: u64,
_buf: &mut (impl futures::io::AsyncWrite + Send),
) -> Result<(), Self::Error> {
unimplemented!()
}
async fn del(&mut self, offset: u64, length: u64) -> Result<(), Self::Error> {
let overflow = offset % self.page_size as u64;
let inc = match overflow {
0 => 0,
_ => self.page_size as u64 - overflow,
};
if inc < length {
let mut offset = offset + inc;
let length = length - overflow;
let end = offset + length;
let mut i = offset - self.page_size as u64;
while (offset + self.page_size as u64 <= end)
&& i < self.buffers.capacity() as u64
{
self.buffers.remove(i as usize);
offset += self.page_size as u64;
i += 1;
}
}
Ok(())
}
async fn truncate(&mut self, _length: u64) -> Result<(), Self::Error> {
unimplemented!()
}
async fn len(&self) -> Result<u64, Self::Error> {
Ok(self.length)
}
async fn is_empty(&mut self) -> Result<bool, Self::Error> {
Ok(self.length == 0)
}
}