use crate::{
db::{CHUNK_SIZE, EMPTY_RECORD, Record, SavePoint},
node::Node,
};
use bincode::config;
use std::{
fs::File,
io,
ops::{Index, IndexMut, RangeFrom},
sync::*,
};
pub trait StorageBackend: Sync + Send {
fn len(&self) -> Result<u64, io::Error>;
fn is_empty(&self) -> Result<bool, io::Error> {
Ok(self.len()? == 0)
}
fn set_len(&self, len: u64) -> Result<(), io::Error>;
fn read(&self, offset: u64, len: usize) -> Result<Vec<u8>, io::Error>;
fn sync_data(&self) -> Result<(), io::Error>;
fn write(&self, offset: u64, data: &[u8]) -> Result<(), io::Error>;
}
#[derive(Debug, Default)]
pub struct MemoryBackend(RwLock<Vec<u8>>);
#[cfg(unix)]
use std::os::{fd::AsRawFd, unix::fs::FileExt};
#[cfg(windows)]
use std::os::windows::fs::FileExt;
#[cfg(not(any(windows, unix)))]
use std::sync::Mutex;
#[cfg(any(windows, unix))]
pub struct FileBackend {
file: File,
locked: bool,
}
#[cfg(unix)]
impl FileBackend {
pub fn new(file: File) -> Result<Self, io::Error> {
let fd = file.as_raw_fd();
let result = unsafe { libc::flock(fd, libc::LOCK_EX | libc::LOCK_NB) };
if result != 0 {
let err = io::Error::last_os_error();
if err.kind() == io::ErrorKind::WouldBlock {
Err(io::Error::new(
io::ErrorKind::WouldBlock,
"Database already open for writing",
))
} else {
Err(err)
}
} else {
Ok(Self { file, locked: true })
}
}
pub fn read_only(file: File) -> Self {
Self {
file,
locked: false,
}
}
}
#[cfg(unix)]
impl Drop for FileBackend {
fn drop(&mut self) {
if self.locked {
unsafe { libc::flock(self.file.as_raw_fd(), libc::LOCK_UN) };
}
}
}
#[cfg(unix)]
impl StorageBackend for FileBackend {
fn len(&self) -> Result<u64, io::Error> {
Ok(self.file.metadata()?.len())
}
fn set_len(&self, len: u64) -> Result<(), io::Error> {
self.file.set_len(len)
}
fn read(&self, offset: u64, len: usize) -> Result<Vec<u8>, io::Error> {
let mut buffer = vec![0; len];
self.file.read_exact_at(&mut buffer, offset)?;
Ok(buffer)
}
fn sync_data(&self) -> Result<(), io::Error> {
self.file.sync_data()
}
fn write(&self, offset: u64, data: &[u8]) -> Result<(), io::Error> {
self.file.write_all_at(data, offset)
}
}
#[cfg(windows)]
impl FileBackend {
pub fn new(file: File) -> Result<Self, io::Error> {
Ok(Self {
file,
locked: false,
})
}
pub fn read_only(file: File) -> Self {
Self {
file,
locked: false,
}
}
}
#[cfg(windows)]
impl StorageBackend for FileBackend {
fn set_len(&self, len: u64) -> Result<(), io::Error> {
self.file.set_len(len)
}
fn len(&self) -> Result<u64, io::Error> {
Ok(self.file.metadata()?.len())
}
fn read(&self, mut offset: u64, len: usize) -> Result<Vec<u8>, io::Error> {
let mut buffer = vec![0; len];
let mut data_offset = 0;
while data_offset < buffer.len() {
let read = self.file.seek_read(&mut buffer[data_offset..], offset)?;
offset += read as u64;
data_offset += read;
}
Ok(buffer)
}
fn sync_data(&self) -> Result<(), io::Error> {
self.file.sync_data()
}
fn write(&self, mut offset: u64, data: &[u8]) -> Result<(), io::Error> {
let mut data_offset = 0;
while data_offset < data.len() {
let written = self.file.seek_write(&data[data_offset..], offset)?;
offset += written as u64;
data_offset += written;
}
Ok(())
}
}
#[cfg(not(any(windows, unix)))]
struct FileBackend {
file: Mutex<File>,
}
#[cfg(not(any(windows, unix)))]
impl FileBackend {
fn new(file: File) -> Result<Self, DatabaseError> {
Ok(Self {
file: Mutex::new(file),
})
}
}
#[cfg(not(any(windows, unix)))]
impl StorageBackend for FileBackend {
fn set_len(&self, len: u64) -> Result<(), io::Error> {
self.file.lock().unwrap().set_len(len)
}
fn len(&self) -> Result<u64, io::Error> {
Ok(self.file.lock().unwrap().metadata()?.len())
}
fn sync_data(&self, eventual: bool) -> Result<(), io::Error> {
self.file.lock().unwrap().sync_data()
}
fn write(&self, offset: u64, data: &[u8]) -> Result<(), io::Error> {
let file = self.file.lock().unwrap();
file.seek(SeekFrom::Start(offset))?;
file.write_all(data)
}
fn read(&self, offset: u64, len: usize) -> Result<Vec<u8>, io::Error> {
let mut result = vec![0; len];
let file = self.file.lock().unwrap();
file.seek(SeekFrom::Start(offset))?;
file.read_exact(&mut result)?;
Ok(result)
}
}
impl MemoryBackend {
fn out_of_range() -> io::Error {
io::Error::new(io::ErrorKind::InvalidInput, "Index out-of-range.")
}
}
impl MemoryBackend {
pub fn new() -> Self {
Self::default()
}
fn read(&self) -> RwLockReadGuard<'_, Vec<u8>> {
self.0.read().expect("Could not acquire read lock.")
}
fn write(&self) -> RwLockWriteGuard<'_, Vec<u8>> {
self.0.write().expect("Could not acquire write lock.")
}
}
impl StorageBackend for MemoryBackend {
fn len(&self) -> Result<u64, io::Error> {
Ok(self.read().len() as u64)
}
fn set_len(&self, len: u64) -> Result<(), io::Error> {
let mut guard = self.write();
let len = usize::try_from(len).map_err(|_| Self::out_of_range())?;
if guard.len() < len {
let additional = len - guard.len();
guard.reserve(additional);
for _ in 0..additional {
guard.push(0);
}
} else {
guard.truncate(len);
}
Ok(())
}
fn read(&self, offset: u64, len: usize) -> Result<Vec<u8>, io::Error> {
let guard = self.read();
let offset = usize::try_from(offset).map_err(|_| Self::out_of_range())?;
if offset + len <= guard.len() {
Ok(guard[offset..offset + len].to_owned())
} else {
Err(Self::out_of_range())
}
}
fn sync_data(&self) -> Result<(), io::Error> {
Ok(())
}
fn write(&self, offset: u64, data: &[u8]) -> Result<(), io::Error> {
let mut guard = self.write();
let offset = usize::try_from(offset).map_err(|_| Self::out_of_range())?;
if offset + data.len() <= guard.len() {
guard[offset..offset + data.len()].copy_from_slice(data);
Ok(())
} else {
Err(Self::out_of_range())
}
}
}
#[allow(clippy::borrowed_box)]
pub struct WriteBuffer<'file, const SIZE: usize> {
file: &'file Box<dyn StorageBackend>,
buffer: Box<[u8; SIZE]>,
len: usize,
file_len: u64,
}
impl<'file, const SIZE: usize> WriteBuffer<'file, SIZE> {
#[allow(clippy::borrowed_box)]
pub(crate) fn new(file: &'file Box<dyn StorageBackend>, file_len: u64) -> Self {
Self {
file,
buffer: Box::new([0u8; SIZE]),
len: 0,
file_len,
}
}
fn remaining(&self) -> usize {
SIZE - self.len
}
fn tail(&mut self) -> &mut [u8] {
&mut self.buffer[self.len..]
}
pub(crate) fn flush(&mut self) -> Result<(), io::Error> {
if self.len == 0 {
return Ok(());
}
let aligned_len = self.len - (self.len % CHUNK_SIZE as usize);
if aligned_len > 0 {
self.file.set_len(self.file_len + aligned_len as u64)?;
self.file
.write(self.file_len, &self.buffer[0..aligned_len])?;
self.file_len += aligned_len as u64;
}
if aligned_len < self.len {
let remaining_len = self.len - aligned_len;
self.buffer.copy_within(aligned_len..self.len, 0);
self.buffer[remaining_len..CHUNK_SIZE as usize].fill(0);
self.file.set_len(self.file_len + CHUNK_SIZE)?;
self.file
.write(self.file_len, &self.buffer[0..CHUNK_SIZE as usize])?;
self.file_len += CHUNK_SIZE;
}
self.len = 0;
Ok(())
}
pub fn write_save_point(&mut self, save_point: &SavePoint) -> Result<Record, io::Error> {
let config = config::standard();
let size = bincode::encode_into_slice(save_point, self.tail(), config)
.map_err(|e| io::Error::other(format!("Failed to encode save point: {}", e)))?;
let record = Record {
offset: self.file_len + self.len as u64,
size: size as u32,
};
self.len += size;
Ok(record)
}
pub fn write_node(&mut self, node: &mut Node) -> Result<Record, io::Error> {
if self.remaining() < node.mem_size() {
self.flush()?;
}
let config = config::standard();
if node.inner.is_none() {
if node.id != EMPTY_RECORD {
return Ok(node.id);
}
return Err(io::Error::new(io::ErrorKind::NotFound, "Node not found"));
}
let size = {
let inner = node.inner.as_mut().unwrap();
bincode::encode_into_slice(inner, self.tail(), config)
.map_err(|e| io::Error::other(format!("Failed to encode node: {}", e)))?
};
let node_id = Record {
offset: self.file_len + self.len as u64,
size: size as u32,
};
self.len += size;
Ok(node_id)
}
}
impl<'file, const SIZE: usize> Index<usize> for WriteBuffer<'file, SIZE> {
type Output = u8;
fn index(&self, index: usize) -> &Self::Output {
&self.buffer[index]
}
}
impl<'file, const SIZE: usize> Index<std::ops::Range<usize>> for WriteBuffer<'file, SIZE> {
type Output = [u8];
fn index(&self, range: std::ops::Range<usize>) -> &Self::Output {
&self.buffer[range]
}
}
impl<'file, const SIZE: usize> IndexMut<usize> for WriteBuffer<'file, SIZE> {
fn index_mut(&mut self, index: usize) -> &mut Self::Output {
&mut self.buffer[index]
}
}
impl<'file, const SIZE: usize> IndexMut<std::ops::Range<usize>> for WriteBuffer<'file, SIZE> {
fn index_mut(&mut self, range: std::ops::Range<usize>) -> &mut Self::Output {
&mut self.buffer[range]
}
}
impl<'file, const SIZE: usize> Index<RangeFrom<usize>> for WriteBuffer<'file, SIZE> {
type Output = [u8];
fn index(&self, range: RangeFrom<usize>) -> &Self::Output {
&self.buffer[range]
}
}
impl<'file, const SIZE: usize> IndexMut<RangeFrom<usize>> for WriteBuffer<'file, SIZE> {
fn index_mut(&mut self, range: RangeFrom<usize>) -> &mut Self::Output {
&mut self.buffer[range]
}
}