use std::fs::File;
use std::io::{Read, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
use std::sync::{Mutex, OnceLock};
#[cfg(test)]
use std::{cell::Cell, rc::Rc};
use crate::config::OpenMode;
use crate::error::DbError;
pub trait Store {
fn len(&self) -> Result<u64, DbError>;
fn is_empty(&self) -> Result<bool, DbError> {
Ok(self.len()? == 0)
}
fn read_exact_at(&mut self, offset: u64, buf: &mut [u8]) -> Result<(), DbError>;
fn write_all_at(&mut self, offset: u64, buf: &[u8]) -> Result<(), DbError>;
fn sync(&mut self) -> Result<(), DbError>;
fn truncate(&mut self, len: u64) -> Result<(), DbError>;
}
#[derive(Debug)]
struct RawFileStore {
file: File,
}
impl RawFileStore {
fn new(file: File) -> Self {
Self { file }
}
}
impl Store for RawFileStore {
fn len(&self) -> Result<u64, DbError> {
Ok(self.file.metadata()?.len())
}
fn read_exact_at(&mut self, offset: u64, buf: &mut [u8]) -> Result<(), DbError> {
self.file.seek(SeekFrom::Start(offset))?;
self.file.read_exact(buf)?;
Ok(())
}
fn write_all_at(&mut self, offset: u64, buf: &[u8]) -> Result<(), DbError> {
self.file.seek(SeekFrom::Start(offset))?;
self.file.write_all(buf)?;
Ok(())
}
fn sync(&mut self) -> Result<(), DbError> {
self.file.sync_all()?;
Ok(())
}
fn truncate(&mut self, len: u64) -> Result<(), DbError> {
self.file.set_len(len)?;
Ok(())
}
}
#[derive(Debug)]
pub struct FileStore {
inner: crate::pager::PagedStore<RawFileStore>,
_writer_lock: Option<WriterLockGuard>,
_reader_lock: Option<File>,
#[cfg(test)]
test_write_counter: Option<Rc<Cell<usize>>>,
#[cfg(test)]
test_write_budget_remaining: Option<Rc<Cell<usize>>>,
}
#[derive(Debug)]
struct WriterLockState {
_file: File,
refs: usize,
}
static WRITER_LOCKS: OnceLock<Mutex<std::collections::HashMap<PathBuf, WriterLockState>>> =
OnceLock::new();
fn writer_locks() -> &'static Mutex<std::collections::HashMap<PathBuf, WriterLockState>> {
WRITER_LOCKS.get_or_init(|| Mutex::new(std::collections::HashMap::new()))
}
#[derive(Debug)]
struct WriterLockGuard {
lock_path: PathBuf,
}
impl Drop for WriterLockGuard {
fn drop(&mut self) {
let mut g = writer_locks().lock().unwrap_or_else(|e| e.into_inner());
if let Some(st) = g.get_mut(&self.lock_path) {
st.refs = st.refs.saturating_sub(1);
}
if g.get(&self.lock_path).is_some_and(|s| s.refs == 0) {
g.remove(&self.lock_path);
}
}
}
#[cfg(test)]
mod tests {
include!(concat!(
env!("CARGO_MANIFEST_DIR"),
"/tests/unit/src_storage_tests.rs"
));
}
impl FileStore {
pub fn new(file: File) -> Self {
Self {
inner: crate::pager::PagedStore::new(
RawFileStore::new(file),
crate::pager::DEFAULT_PAGE_SIZE,
),
_writer_lock: None,
_reader_lock: None,
#[cfg(test)]
test_write_counter: None,
#[cfg(test)]
test_write_budget_remaining: None,
}
}
#[cfg(test)]
pub(crate) fn new_for_test(
file: File,
write_counter: Option<Rc<Cell<usize>>>,
write_budget_remaining: Option<Rc<Cell<usize>>>,
) -> Self {
Self {
inner: crate::pager::PagedStore::new(
RawFileStore::new(file),
crate::pager::DEFAULT_PAGE_SIZE,
),
_writer_lock: None,
_reader_lock: None,
test_write_counter: write_counter,
test_write_budget_remaining: write_budget_remaining,
}
}
fn lock_path_for_db_path(db_path: &Path) -> PathBuf {
PathBuf::from(format!("{}.writer.lock", db_path.display()))
}
pub fn open_locked(path: impl AsRef<Path>, mode: OpenMode) -> Result<Self, DbError> {
use fs2::FileExt;
let path = path.as_ref();
let file = match mode {
OpenMode::ReadWrite => std::fs::OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(path)?,
OpenMode::ReadOnly => std::fs::OpenOptions::new().read(true).open(path)?,
};
let lock_path = Self::lock_path_for_db_path(path);
let writer_lock = match mode {
OpenMode::ReadOnly => None,
OpenMode::ReadWrite => {
let mut g = writer_locks()
.lock()
.map_err(|_| std::io::Error::other("lock poisoned"))?;
if let Some(st) = g.get_mut(&lock_path) {
st.refs = st.refs.saturating_add(1);
Some(WriterLockGuard {
lock_path: lock_path.clone(),
})
} else {
let lock_file = std::fs::OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(&lock_path)?;
lock_file.try_lock_exclusive()?;
g.insert(
lock_path.clone(),
WriterLockState {
_file: lock_file,
refs: 1,
},
);
Some(WriterLockGuard {
lock_path: lock_path.clone(),
})
}
}
};
let reader_lock = match mode {
OpenMode::ReadWrite => None,
OpenMode::ReadOnly => {
let already_writer = writer_locks()
.lock()
.ok()
.and_then(|g| g.get(&lock_path).map(|_| ()))
.is_some();
if already_writer {
return Ok(Self {
inner: crate::pager::PagedStore::new(
RawFileStore::new(file),
crate::pager::DEFAULT_PAGE_SIZE,
),
_writer_lock: None,
_reader_lock: None,
#[cfg(test)]
test_write_counter: None,
#[cfg(test)]
test_write_budget_remaining: None,
});
}
let lock_file = std::fs::OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(&lock_path)?;
match lock_file.try_lock_shared() {
Ok(()) => Some(lock_file),
Err(std::fs::TryLockError::WouldBlock)
| Err(std::fs::TryLockError::Error(_)) => {
return Err(DbError::Io(std::io::Error::new(
std::io::ErrorKind::WouldBlock,
"database is locked by another process",
)));
}
}
}
};
Ok(Self {
inner: crate::pager::PagedStore::new(
RawFileStore::new(file),
crate::pager::DEFAULT_PAGE_SIZE,
),
_writer_lock: writer_lock,
_reader_lock: reader_lock,
#[cfg(test)]
test_write_counter: None,
#[cfg(test)]
test_write_budget_remaining: None,
})
}
}
impl Store for FileStore {
fn len(&self) -> Result<u64, DbError> {
self.inner.len()
}
fn read_exact_at(&mut self, offset: u64, buf: &mut [u8]) -> Result<(), DbError> {
self.inner.read_exact_at(offset, buf)
}
fn write_all_at(&mut self, offset: u64, buf: &[u8]) -> Result<(), DbError> {
#[cfg(test)]
{
if let Some(c) = &self.test_write_counter {
c.set(c.get().saturating_add(1));
}
if let Some(budget) = &self.test_write_budget_remaining {
let r = budget.get();
if r == 0 {
return Err(DbError::Io(std::io::Error::other(
"FileStore write budget exhausted (test instrumentation)",
)));
}
budget.set(r - 1);
}
}
self.inner.write_all_at(offset, buf)
}
fn sync(&mut self) -> Result<(), DbError> {
self.inner.sync()
}
fn truncate(&mut self, len: u64) -> Result<(), DbError> {
self.inner.truncate(len)
}
}
#[derive(Debug, Default)]
pub struct VecStore {
buf: Vec<u8>,
}
impl VecStore {
pub fn new() -> Self {
Self { buf: Vec::new() }
}
pub fn into_inner(self) -> Vec<u8> {
self.buf
}
pub fn from_vec(buf: Vec<u8>) -> Self {
Self { buf }
}
pub fn as_slice(&self) -> &[u8] {
&self.buf
}
fn ensure_len(&mut self, end: u64) {
let need = end as usize;
if self.buf.len() < need {
self.buf.resize(need, 0);
}
}
}
impl Store for VecStore {
fn len(&self) -> Result<u64, DbError> {
Ok(self.buf.len() as u64)
}
fn read_exact_at(&mut self, offset: u64, buf: &mut [u8]) -> Result<(), DbError> {
let start = offset as usize;
let end = start.saturating_add(buf.len());
if end > self.buf.len() {
return Err(DbError::Io(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"read past end of VecStore",
)));
}
buf.copy_from_slice(&self.buf[start..end]);
Ok(())
}
fn write_all_at(&mut self, offset: u64, data: &[u8]) -> Result<(), DbError> {
let end = offset
.checked_add(data.len() as u64)
.ok_or_else(|| std::io::Error::new(std::io::ErrorKind::InvalidInput, "overflow"))?;
self.ensure_len(end);
let start = offset as usize;
self.buf[start..start + data.len()].copy_from_slice(data);
Ok(())
}
fn sync(&mut self) -> Result<(), DbError> {
Ok(())
}
fn truncate(&mut self, len: u64) -> Result<(), DbError> {
self.buf.truncate(len as usize);
Ok(())
}
}