use std::fs::{File, OpenOptions};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use crossbeam_utils::CachePadded;
use memmap2::Mmap;
use parking_lot::{Mutex, RwLock};
use crate::storage::flush::FlushPolicy;
use crate::storage::meta::{self, MetaHeader};
use crate::{Error, Result};
const FSYS_FRAME_OVERHEAD: u64 = 12;
const FSYS_PRE_PAYLOAD_BYTES: u64 = 8;
const FSYS_POST_PAYLOAD_BYTES: u64 = 4;
pub(crate) struct Store {
path: PathBuf,
journal: Arc<fsys::JournalHandle>,
fs: fsys::Handle,
read_file: Mutex<File>,
mmap: RwLock<Arc<Mmap>>,
mmap_len: CachePadded<AtomicU64>,
policy: FlushPolicy,
meta: Arc<RwLock<MetaHeader>>,
}
impl std::fmt::Debug for Store {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Store")
.field("path", &self.path)
.field("policy", &self.policy)
.field("next_lsn", &self.journal.next_lsn().as_u64())
.finish()
}
}
impl Store {
pub(crate) fn open(path: PathBuf, flags: u32) -> Result<Self> {
Self::open_with_policy(path, flags, FlushPolicy::default(), None)
}
pub(crate) fn open_with_policy(
path: PathBuf,
flags: u32,
policy: FlushPolicy,
iouring_sqpoll_idle_ms: Option<u32>,
) -> Result<Self> {
let mut fs_builder = fsys::builder().tune_for(fsys::Workload::Database);
if let Some(idle_ms) = iouring_sqpoll_idle_ms {
fs_builder = fs_builder.sqpoll(idle_ms);
}
let fs = fs_builder
.build()
.map_err(|err| Error::Io(std::io::Error::other(format!("fsys init: {err}"))))?;
let meta = match meta::read(&path)? {
Some(existing) => existing,
None => {
let fresh = MetaHeader::fresh(flags);
meta::write_with(&fs, &path, &fresh)?;
fresh
}
};
let journal_opts =
fsys::JournalOptions::new().write_lifetime_hint(Some(fsys::WriteLifetimeHint::Long));
let journal = fs
.journal_with(&path, journal_opts)
.map_err(|err| Error::Io(std::io::Error::other(format!("fsys journal: {err}"))))?;
let journal = Arc::new(journal);
let read_file = OpenOptions::new()
.read(true)
.open(&path)
.map_err(Error::Io)?;
let initial_mmap = unsafe { Mmap::map(&read_file)? };
let mmap_len = initial_mmap.len() as u64;
Ok(Self {
path,
journal,
fs,
read_file: Mutex::new(read_file),
mmap: RwLock::new(Arc::new(initial_mmap)),
mmap_len: CachePadded::new(AtomicU64::new(mmap_len)),
policy,
meta: Arc::new(RwLock::new(meta)),
})
}
pub(crate) fn path(&self) -> &Path {
&self.path
}
pub(crate) fn header(&self) -> Result<MetaHeader> {
Ok(*self.meta.read())
}
pub(crate) fn tail(&self) -> u64 {
self.journal.next_lsn().as_u64()
}
pub(crate) fn mmap(&self) -> Result<Arc<Mmap>> {
Ok(Arc::clone(&self.mmap.read()))
}
pub(crate) fn mmap_covering(&self, end_offset: u64) -> Result<Arc<Mmap>> {
let cur_len = self.mmap_len.load(Ordering::Acquire);
if end_offset > cur_len {
self.refresh_mmap()?;
}
Ok(Arc::clone(&self.mmap.read()))
}
pub(crate) fn append(&self, payload: &[u8]) -> Result<u64> {
let payload_len = payload.len() as u64;
let end_lsn = self
.journal
.append(payload)
.map_err(|err| Error::Io(std::io::Error::other(format!("fsys append: {err}"))))?
.as_u64();
let payload_start = end_lsn - FSYS_POST_PAYLOAD_BYTES - payload_len;
if matches!(self.policy, FlushPolicy::WriteThrough) {
self.journal
.sync_through(fsys::Lsn::new(end_lsn))
.map_err(|err| Error::Io(std::io::Error::other(format!("fsys sync: {err}"))))?;
}
Ok(payload_start)
}
pub(crate) fn append_with<F>(&self, fill_payload: F) -> Result<u64>
where
F: FnOnce(&mut Vec<u8>) -> Result<()>,
{
let mut buf = Vec::with_capacity(64);
fill_payload(&mut buf)?;
self.append(&buf)
}
pub(crate) fn append_batch_with<F>(&self, fill: F) -> Result<Vec<u64>>
where
F: FnOnce(&mut Vec<Vec<u8>>) -> Result<()>,
{
let mut payloads: Vec<Vec<u8>> = Vec::new();
fill(&mut payloads)?;
let slices: Vec<&[u8]> = payloads.iter().map(|v| v.as_slice()).collect();
self.append_batch(slices)
}
pub(crate) fn append_batch<'a, I>(&self, payloads: I) -> Result<Vec<u64>>
where
I: IntoIterator<Item = &'a [u8]>,
{
let payloads: Vec<&[u8]> = payloads.into_iter().collect();
if payloads.is_empty() {
return Ok(Vec::new());
}
let end_lsn = self
.journal
.append_batch(&payloads)
.map_err(|err| Error::Io(std::io::Error::other(format!("fsys append_batch: {err}"))))?
.as_u64();
let total_frame_size: u64 = payloads
.iter()
.map(|p| FSYS_FRAME_OVERHEAD + p.len() as u64)
.sum();
let batch_start = end_lsn - total_frame_size;
let mut starts = Vec::with_capacity(payloads.len());
let mut cursor = batch_start;
for payload in &payloads {
starts.push(cursor + FSYS_PRE_PAYLOAD_BYTES);
cursor += FSYS_FRAME_OVERHEAD + payload.len() as u64;
}
if matches!(self.policy, FlushPolicy::WriteThrough) {
self.journal
.sync_through(fsys::Lsn::new(end_lsn))
.map_err(|err| Error::Io(std::io::Error::other(format!("fsys sync: {err}"))))?;
}
Ok(starts)
}
pub(crate) fn flush(&self) -> Result<()> {
let target = self.journal.next_lsn();
self.journal
.sync_through(target)
.map_err(|err| Error::Io(std::io::Error::other(format!("fsys sync: {err}"))))?;
Ok(())
}
pub(crate) fn persist_meta(&self) -> Result<()> {
let header = *self.meta.read();
meta::write_with(&self.fs, &self.path, &header)?;
Ok(())
}
pub(crate) fn set_encryption_metadata(
&self,
salt: [u8; meta::META_SALT_LEN],
verify: [u8; meta::META_VERIFY_LEN],
) -> Result<()> {
{
let mut guard = self.meta.write();
guard.encryption_salt = salt;
guard.encryption_verify = verify;
guard.flags |= meta::FLAG_ENCRYPTED;
}
self.persist_meta()
}
pub(crate) fn swap_underlying(self: &Arc<Self>, replacement_path: &Path) -> Result<()> {
let mut mmap_guard = self.mmap.write();
let mut file_guard = self.read_file.lock();
let placeholder = OpenOptions::new()
.read(true)
.open(replacement_path)
.map_err(Error::Io)?;
let _old_read_file = std::mem::replace(&mut *file_guard, placeholder);
drop(_old_read_file);
std::fs::rename(replacement_path, &self.path).map_err(Error::Io)?;
let new_read_file = OpenOptions::new()
.read(true)
.open(&self.path)
.map_err(Error::Io)?;
*file_guard = new_read_file;
let new_mmap = unsafe { Mmap::map(&*file_guard)? };
let new_len = new_mmap.len() as u64;
*mmap_guard = Arc::new(new_mmap);
self.mmap_len.store(new_len, Ordering::Release);
Ok(())
}
fn refresh_mmap(&self) -> Result<()> {
let file_guard = self.read_file.lock();
let new_mmap = unsafe { Mmap::map(&*file_guard)? };
let new_len = new_mmap.len() as u64;
drop(file_guard);
let mut mmap_guard = self.mmap.write();
*mmap_guard = Arc::new(new_mmap);
self.mmap_len.store(new_len, Ordering::Release);
Ok(())
}
pub(crate) fn open_reader(&self) -> Result<fsys::JournalReader> {
fsys::JournalReader::open(&self.path)
.map_err(|err| Error::Io(std::io::Error::other(format!("fsys reader: {err}"))))
}
pub(crate) fn fs(&self) -> &fsys::Handle {
&self.fs
}
pub(crate) const fn frame_overhead() -> u64 {
FSYS_FRAME_OVERHEAD
}
pub(crate) const fn pre_payload_bytes() -> u64 {
FSYS_PRE_PAYLOAD_BYTES
}
}
impl Drop for Store {
fn drop(&mut self) {
let _ = self.persist_meta();
}
}