use std::fs::{File, OpenOptions};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex, RwLock};
use memmap2::Mmap;
use crate::storage::flush::FlushPolicy;
use crate::storage::meta::{self, MetaHeader};
use crate::{Error, Result};
const FSYS_FRAME_OVERHEAD: u64 = 12;
const FSYS_PRE_PAYLOAD_BYTES: u64 = 8;
const FSYS_POST_PAYLOAD_BYTES: u64 = 4;
pub(crate) struct Store {
path: PathBuf,
journal: Arc<fsys::JournalHandle>,
fs: fsys::Handle,
read_file: Mutex<File>,
mmap: RwLock<Arc<Mmap>>,
mmap_len: AtomicU64,
policy: FlushPolicy,
meta: Arc<RwLock<MetaHeader>>,
}
impl std::fmt::Debug for Store {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Store")
.field("path", &self.path)
.field("policy", &self.policy)
.field("next_lsn", &self.journal.next_lsn().as_u64())
.finish()
}
}
impl Store {
pub(crate) fn open(path: PathBuf, flags: u32) -> Result<Self> {
Self::open_with_policy(path, flags, FlushPolicy::default())
}
pub(crate) fn open_with_policy(path: PathBuf, flags: u32, policy: FlushPolicy) -> Result<Self> {
let meta = match meta::read(&path)? {
Some(existing) => existing,
None => {
let fresh = MetaHeader::fresh(flags);
meta::write(&path, &fresh)?;
fresh
}
};
let fs = fsys::builder()
.build()
.map_err(|err| Error::Io(std::io::Error::other(format!("fsys init: {err}"))))?;
let journal = fs
.journal(&path)
.map_err(|err| Error::Io(std::io::Error::other(format!("fsys journal: {err}"))))?;
let journal = Arc::new(journal);
let read_file = OpenOptions::new()
.read(true)
.open(&path)
.map_err(Error::Io)?;
let initial_mmap = unsafe { Mmap::map(&read_file)? };
let mmap_len = initial_mmap.len() as u64;
Ok(Self {
path,
journal,
fs,
read_file: Mutex::new(read_file),
mmap: RwLock::new(Arc::new(initial_mmap)),
mmap_len: AtomicU64::new(mmap_len),
policy,
meta: Arc::new(RwLock::new(meta)),
})
}
pub(crate) fn path(&self) -> &Path {
&self.path
}
pub(crate) fn header(&self) -> Result<MetaHeader> {
let guard = self.meta.read().map_err(|_| Error::LockPoisoned)?;
Ok(*guard)
}
pub(crate) fn tail(&self) -> u64 {
self.journal.next_lsn().as_u64()
}
pub(crate) fn mmap(&self) -> Result<Arc<Mmap>> {
let guard = self.mmap.read().map_err(|_| Error::LockPoisoned)?;
Ok(Arc::clone(&guard))
}
pub(crate) fn mmap_covering(&self, end_offset: u64) -> Result<Arc<Mmap>> {
let cur_len = self.mmap_len.load(Ordering::Acquire);
if end_offset > cur_len {
self.refresh_mmap()?;
}
let guard = self.mmap.read().map_err(|_| Error::LockPoisoned)?;
Ok(Arc::clone(&guard))
}
pub(crate) fn append(&self, payload: &[u8]) -> Result<u64> {
let payload_len = payload.len() as u64;
let end_lsn = self
.journal
.append(payload)
.map_err(|err| Error::Io(std::io::Error::other(format!("fsys append: {err}"))))?
.as_u64();
let payload_start = end_lsn - FSYS_POST_PAYLOAD_BYTES - payload_len;
if matches!(self.policy, FlushPolicy::WriteThrough) {
self.journal
.sync_through(fsys::Lsn(end_lsn))
.map_err(|err| Error::Io(std::io::Error::other(format!("fsys sync: {err}"))))?;
}
Ok(payload_start)
}
pub(crate) fn append_with<F>(&self, fill_payload: F) -> Result<u64>
where
F: FnOnce(&mut Vec<u8>) -> Result<()>,
{
let mut buf = Vec::with_capacity(64);
fill_payload(&mut buf)?;
self.append(&buf)
}
pub(crate) fn append_batch_with<F>(&self, fill: F) -> Result<Vec<u64>>
where
F: FnOnce(&mut Vec<Vec<u8>>) -> Result<()>,
{
let mut payloads: Vec<Vec<u8>> = Vec::new();
fill(&mut payloads)?;
let slices: Vec<&[u8]> = payloads.iter().map(|v| v.as_slice()).collect();
self.append_batch(slices)
}
pub(crate) fn append_batch<'a, I>(&self, payloads: I) -> Result<Vec<u64>>
where
I: IntoIterator<Item = &'a [u8]>,
{
let payloads: Vec<&[u8]> = payloads.into_iter().collect();
let mut starts = Vec::with_capacity(payloads.len());
let mut last_end_lsn: u64 = 0;
for payload in payloads {
let end_lsn = self
.journal
.append(payload)
.map_err(|err| Error::Io(std::io::Error::other(format!("fsys append: {err}"))))?
.as_u64();
let payload_start = end_lsn - FSYS_POST_PAYLOAD_BYTES - payload.len() as u64;
starts.push(payload_start);
last_end_lsn = end_lsn;
}
if matches!(self.policy, FlushPolicy::WriteThrough) && last_end_lsn > 0 {
self.journal
.sync_through(fsys::Lsn(last_end_lsn))
.map_err(|err| Error::Io(std::io::Error::other(format!("fsys sync: {err}"))))?;
}
Ok(starts)
}
pub(crate) fn flush(&self) -> Result<()> {
let target = self.journal.next_lsn();
self.journal
.sync_through(target)
.map_err(|err| Error::Io(std::io::Error::other(format!("fsys sync: {err}"))))?;
Ok(())
}
pub(crate) fn persist_meta(&self) -> Result<()> {
let header = *self.meta.read().map_err(|_| Error::LockPoisoned)?;
meta::write(&self.path, &header)?;
Ok(())
}
pub(crate) fn set_encryption_metadata(
&self,
salt: [u8; meta::META_SALT_LEN],
verify: [u8; meta::META_VERIFY_LEN],
) -> Result<()> {
{
let mut guard = self.meta.write().map_err(|_| Error::LockPoisoned)?;
guard.encryption_salt = salt;
guard.encryption_verify = verify;
guard.flags |= meta::FLAG_ENCRYPTED;
}
self.persist_meta()
}
pub(crate) fn swap_underlying(self: &Arc<Self>, replacement_path: &Path) -> Result<()> {
let mut mmap_guard = self.mmap.write().map_err(|_| Error::LockPoisoned)?;
let mut file_guard = self.read_file.lock().map_err(|_| Error::LockPoisoned)?;
let placeholder = OpenOptions::new()
.read(true)
.open(replacement_path)
.map_err(Error::Io)?;
let _old_read_file = std::mem::replace(&mut *file_guard, placeholder);
drop(_old_read_file);
std::fs::rename(replacement_path, &self.path).map_err(Error::Io)?;
let new_read_file = OpenOptions::new()
.read(true)
.open(&self.path)
.map_err(Error::Io)?;
*file_guard = new_read_file;
let new_mmap = unsafe { Mmap::map(&*file_guard)? };
let new_len = new_mmap.len() as u64;
*mmap_guard = Arc::new(new_mmap);
self.mmap_len.store(new_len, Ordering::Release);
Ok(())
}
fn refresh_mmap(&self) -> Result<()> {
let file_guard = self.read_file.lock().map_err(|_| Error::LockPoisoned)?;
let new_mmap = unsafe { Mmap::map(&*file_guard)? };
let new_len = new_mmap.len() as u64;
drop(file_guard);
let mut mmap_guard = self.mmap.write().map_err(|_| Error::LockPoisoned)?;
*mmap_guard = Arc::new(new_mmap);
self.mmap_len.store(new_len, Ordering::Release);
Ok(())
}
pub(crate) fn open_reader(&self) -> Result<fsys::JournalReader> {
fsys::JournalReader::open(&self.path)
.map_err(|err| Error::Io(std::io::Error::other(format!("fsys reader: {err}"))))
}
pub(crate) fn fs(&self) -> &fsys::Handle {
&self.fs
}
pub(crate) const fn frame_overhead() -> u64 {
FSYS_FRAME_OVERHEAD
}
pub(crate) const fn pre_payload_bytes() -> u64 {
FSYS_PRE_PAYLOAD_BYTES
}
}
impl Drop for Store {
fn drop(&mut self) {
let _ = self.persist_meta();
}
}