use crate::batch::Batch;
use crate::buffer::AlignedBufferPool;
use crate::error::BatchError;
use crate::method::Method;
use crate::path::Mode;
use crate::pipeline::{BatchOp, HandleSnapshot, Pipeline};
use crate::{Error, Result};
use std::path::{Path, PathBuf};
use std::sync::atomic::AtomicU64;
use std::sync::atomic::{AtomicU8, Ordering};
use std::sync::Mutex;
#[cfg(target_os = "linux")]
use crate::platform::linux_iouring::{IoUringRing, NvmeAccess};
#[cfg(target_os = "linux")]
use std::sync::Arc;
#[cfg(target_os = "windows")]
use crate::platform::windows_nvme::NvmeAccess as WinNvmeAccess;
#[cfg(target_os = "windows")]
use std::sync::Arc as WinArc;
#[cfg(target_os = "linux")]
enum IoUringState {
Untried,
Active(Arc<IoUringRing>),
Disabled,
}
#[cfg(target_os = "linux")]
enum NvmeState {
Untried,
Active(Arc<NvmeAccess>),
Disabled,
}
#[cfg(target_os = "windows")]
enum NvmeStateWin {
Untried,
Active(WinArc<WinNvmeAccess>),
Disabled,
}
#[derive(Clone, Copy)]
pub(crate) struct HandleBufferPoolConfig {
pub capacity: usize,
pub block_size: usize,
pub block_align: usize,
}
static WRITE_COUNTER: AtomicU64 = AtomicU64::new(0);
pub struct Handle {
configured_method: AtomicU8,
active_method: AtomicU8,
root: Option<PathBuf>,
mode: Mode,
sector_size: u32,
pipeline: Pipeline,
pool_config: HandleBufferPoolConfig,
pool_slot: Mutex<Option<AlignedBufferPool>>,
#[cfg(target_os = "linux")]
iouring_queue_depth: u32,
#[cfg(target_os = "linux")]
iouring_slot: Mutex<IoUringState>,
#[cfg(target_os = "linux")]
nvme_slot: Mutex<NvmeState>,
#[cfg(target_os = "windows")]
nvme_slot_win: Mutex<NvmeStateWin>,
}
impl Handle {
#[cfg_attr(not(target_os = "linux"), allow(unused_variables))]
#[allow(clippy::too_many_arguments)] pub(crate) fn new_raw(
configured_method: Method,
active_method: Method,
root: Option<PathBuf>,
mode: Mode,
sector_size: u32,
pipeline: Pipeline,
pool_config: HandleBufferPoolConfig,
iouring_queue_depth: u32,
) -> Self {
Self {
configured_method: AtomicU8::new(configured_method.to_u8()),
active_method: AtomicU8::new(active_method.to_u8()),
root,
mode,
sector_size,
pipeline,
pool_config,
pool_slot: Mutex::new(None),
#[cfg(target_os = "linux")]
iouring_queue_depth,
#[cfg(target_os = "linux")]
iouring_slot: Mutex::new(IoUringState::Untried),
#[cfg(target_os = "linux")]
nvme_slot: Mutex::new(NvmeState::Untried),
#[cfg(target_os = "windows")]
nvme_slot_win: Mutex::new(NvmeStateWin::Untried),
}
}
#[cfg(target_os = "windows")]
pub(crate) fn nvme_access_win(&self, path: &Path) -> Option<WinArc<WinNvmeAccess>> {
let mut guard = match self.nvme_slot_win.lock() {
Ok(g) => g,
Err(p) => p.into_inner(),
};
match &*guard {
NvmeStateWin::Active(a) => return Some(a.clone()),
NvmeStateWin::Disabled => return None,
NvmeStateWin::Untried => {}
}
match crate::platform::windows_nvme::nvme_flush_capable(path) {
Some(access) => {
let arc = WinArc::new(access);
*guard = NvmeStateWin::Active(arc.clone());
Some(arc)
}
None => {
*guard = NvmeStateWin::Disabled;
None
}
}
}
#[cfg(target_os = "linux")]
pub(crate) fn nvme_access(&self, fd: std::os::fd::RawFd) -> Option<Arc<NvmeAccess>> {
let mut guard = match self.nvme_slot.lock() {
Ok(g) => g,
Err(p) => p.into_inner(),
};
match &*guard {
NvmeState::Active(a) => return Some(a.clone()),
NvmeState::Disabled => return None,
NvmeState::Untried => {}
}
match crate::platform::linux_iouring::nvme_flush_capable(fd) {
Some(access) => {
let arc = Arc::new(access);
*guard = NvmeState::Active(arc.clone());
Some(arc)
}
None => {
*guard = NvmeState::Disabled;
None
}
}
}
#[must_use]
pub fn active_durability_primitive(&self) -> &'static str {
let method = self.active_method();
match method {
Method::Mmap => crate::primitive::MMAP_MSYNC,
Method::Sync => {
#[cfg(target_os = "macos")]
{
crate::primitive::F_FULLFSYNC
}
#[cfg(not(target_os = "macos"))]
{
crate::primitive::FSYNC
}
}
Method::Data => {
#[cfg(target_os = "linux")]
{
crate::primitive::FDATASYNC
}
#[cfg(target_os = "macos")]
{
crate::primitive::F_FULLFSYNC
}
#[cfg(target_os = "windows")]
{
crate::primitive::FSYNC
}
#[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows")))]
{
crate::primitive::FSYNC
}
}
Method::Direct => {
#[cfg(target_os = "linux")]
{
self.linux_direct_primitive()
}
#[cfg(target_os = "macos")]
{
crate::primitive::F_NOCACHE_F_FULLFSYNC
}
#[cfg(target_os = "windows")]
{
self.windows_direct_primitive()
}
#[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows")))]
{
crate::primitive::FSYNC
}
}
_ => crate::primitive::FSYNC,
}
}
#[cfg(target_os = "linux")]
fn linux_direct_primitive(&self) -> &'static str {
let nvme_active = matches!(
*self.nvme_slot.lock().unwrap_or_else(|p| p.into_inner()),
NvmeState::Active(_)
);
if nvme_active {
return crate::primitive::IO_URING_NVME_FLUSH;
}
let ring_active = matches!(
*self.iouring_slot.lock().unwrap_or_else(|p| p.into_inner()),
IoUringState::Active(_)
);
if ring_active {
crate::primitive::IO_URING_FDATASYNC
} else {
crate::primitive::O_DIRECT_PWRITE_FDATASYNC
}
}
#[cfg(target_os = "windows")]
fn windows_direct_primitive(&self) -> &'static str {
let nvme_active = matches!(
*self.nvme_slot_win.lock().unwrap_or_else(|p| p.into_inner()),
NvmeStateWin::Active(_)
);
if nvme_active {
crate::primitive::FILE_FLAG_WRITE_THROUGH_NVME_IOCTL
} else {
crate::primitive::FILE_FLAG_WRITE_THROUGH
}
}
#[cfg(target_os = "linux")]
pub(crate) fn io_uring_ring(&self) -> Option<Arc<IoUringRing>> {
let mut guard = match self.iouring_slot.lock() {
Ok(g) => g,
Err(p) => p.into_inner(),
};
match &*guard {
IoUringState::Active(r) => return Some(r.clone()),
IoUringState::Disabled => return None,
IoUringState::Untried => {}
}
match IoUringRing::new(self.iouring_queue_depth) {
Ok(ring) => {
let arc = Arc::new(ring);
*guard = IoUringState::Active(arc.clone());
Some(arc)
}
Err(_) => {
*guard = IoUringState::Disabled;
None
}
}
}
#[allow(dead_code)] pub(crate) fn buffer_pool(&self) -> Result<AlignedBufferPool> {
let mut guard = match self.pool_slot.lock() {
Ok(g) => g,
Err(p) => p.into_inner(),
};
if let Some(pool) = guard.as_ref() {
return Ok(pool.clone());
}
let pool = AlignedBufferPool::new(
self.pool_config.capacity,
self.pool_config.block_size,
self.pool_config.block_align,
)?;
*guard = Some(pool.clone());
Ok(pool)
}
#[must_use]
pub fn method(&self) -> Method {
Method::from_u8(self.configured_method.load(Ordering::Relaxed))
}
#[must_use]
pub fn active_method(&self) -> Method {
Method::from_u8(self.active_method.load(Ordering::Relaxed))
}
pub fn set_method(&self, method: Method) -> Result<()> {
if method.is_reserved() {
return Err(Error::UnsupportedMethod {
method: method.as_str(),
});
}
let resolved = method.resolve();
self.configured_method
.store(method.to_u8(), Ordering::Relaxed);
self.active_method
.store(resolved.to_u8(), Ordering::Relaxed);
Ok(())
}
#[must_use]
pub fn root(&self) -> Option<&Path> {
self.root.as_deref()
}
#[must_use]
pub fn mode(&self) -> Mode {
self.mode
}
#[must_use]
pub fn sector_size(&self) -> u32 {
self.sector_size
}
pub(crate) fn update_active_method(&self, method: Method) {
self.active_method.store(method.to_u8(), Ordering::Relaxed);
}
pub(crate) fn use_direct(&self) -> bool {
self.active_method() == Method::Direct
}
pub(crate) fn resolve_path(&self, path: &Path) -> Result<PathBuf> {
let Some(root) = &self.root else {
return Ok(path.to_owned());
};
let candidate = if path.is_absolute() {
path.to_owned()
} else {
root.join(path)
};
let mut resolved = PathBuf::new();
for component in candidate.components() {
use std::path::Component;
match component {
Component::Prefix(p) => {
resolved.push(p.as_os_str());
}
Component::RootDir => {
resolved.push(component);
}
Component::CurDir => {
}
Component::ParentDir => {
if !resolved.pop() {
return Err(Error::InvalidPath {
path: path.to_owned(),
reason: "path escapes the handle root".into(),
});
}
}
Component::Normal(n) => {
resolved.push(n);
}
}
}
if !resolved.starts_with(root) {
return Err(Error::InvalidPath {
path: path.to_owned(),
reason: "path escapes the handle root".into(),
});
}
Ok(resolved)
}
pub(crate) fn gen_temp_path(path: &Path) -> PathBuf {
let n = WRITE_COUNTER.fetch_add(1, Ordering::Relaxed);
let parent = path.parent().unwrap_or_else(|| Path::new("."));
let stem = path
.file_name()
.map(|n| n.to_string_lossy().into_owned())
.unwrap_or_default();
let name = format!(".fsys-tmp-{}.{}", n, stem);
parent.join(name)
}
pub fn write_batch<P: AsRef<Path>>(
&self,
batch: &[(P, &[u8])],
) -> std::result::Result<(), BatchError> {
let mut ops: Vec<BatchOp> = Vec::with_capacity(batch.len());
for (i, (path, data)) in batch.iter().enumerate() {
let resolved = self
.resolve_path(path.as_ref())
.map_err(|e| pre_submit_err(i, e))?;
ops.push(BatchOp::Write {
path: resolved,
data: data.to_vec(),
});
}
self.submit_batch(ops)
}
pub fn delete_batch<P: AsRef<Path>>(&self, batch: &[P]) -> std::result::Result<(), BatchError> {
let mut ops: Vec<BatchOp> = Vec::with_capacity(batch.len());
for (i, path) in batch.iter().enumerate() {
let resolved = self
.resolve_path(path.as_ref())
.map_err(|e| pre_submit_err(i, e))?;
ops.push(BatchOp::Delete { path: resolved });
}
self.submit_batch(ops)
}
pub fn copy_batch<P: AsRef<Path>, Q: AsRef<Path>>(
&self,
batch: &[(P, Q)],
) -> std::result::Result<(), BatchError> {
let mut ops: Vec<BatchOp> = Vec::with_capacity(batch.len());
for (i, (src, dst)) in batch.iter().enumerate() {
let resolved_src = self
.resolve_path(src.as_ref())
.map_err(|e| pre_submit_err(i, e))?;
let resolved_dst = self
.resolve_path(dst.as_ref())
.map_err(|e| pre_submit_err(i, e))?;
ops.push(BatchOp::Copy {
src: resolved_src,
dst: resolved_dst,
});
}
self.submit_batch(ops)
}
pub fn batch(&self) -> Batch<'_> {
Batch::new(self)
}
pub(crate) fn snapshot(&self) -> HandleSnapshot {
HandleSnapshot {
method: self.active_method(),
sector_size: self.sector_size,
use_direct: self.use_direct(),
}
}
pub(crate) fn submit_batch(&self, ops: Vec<BatchOp>) -> std::result::Result<(), BatchError> {
self.pipeline.submit(ops, self.snapshot())
}
#[cfg(feature = "async")]
pub(crate) async fn submit_batch_async(
&self,
ops: Vec<BatchOp>,
) -> std::result::Result<(), BatchError> {
self.pipeline.submit_async(ops, self.snapshot()).await
}
}
fn pre_submit_err(index: usize, e: Error) -> BatchError {
BatchError {
failed_at: index,
completed: 0,
source: Box::new(e),
}
}
const _: () = {
#[allow(dead_code)]
fn assert_send<T: Send>() {}
#[allow(dead_code)]
fn assert_sync<T: Sync>() {}
#[allow(dead_code)]
fn check() {
assert_send::<Handle>();
assert_sync::<Handle>();
}
};
#[cfg(test)]
mod tests {
use super::*;
use crate::method::Method;
use crate::path::Mode;
use crate::pipeline::PipelineConfig;
fn default_pool_config() -> HandleBufferPoolConfig {
HandleBufferPoolConfig {
capacity: 64,
block_size: 4096,
block_align: 512,
}
}
fn make_handle(method: Method) -> Handle {
Handle::new_raw(
method,
method.resolve(),
None,
Mode::Dev,
512,
Pipeline::new(PipelineConfig::DEFAULT),
default_pool_config(),
128,
)
}
#[test]
fn test_method_accessor_roundtrip() {
let h = make_handle(Method::Sync);
assert_eq!(h.method(), Method::Sync);
}
#[test]
fn test_active_method_reflects_resolved() {
let h = make_handle(Method::Auto);
let active = h.active_method();
assert_ne!(active, Method::Auto, "active method must be concrete");
}
#[test]
fn test_set_method_updates_active() {
let h = make_handle(Method::Sync);
h.set_method(Method::Data).expect("set_method");
assert_eq!(h.method(), Method::Data);
}
#[test]
fn test_set_reserved_method_returns_error() {
let h = make_handle(Method::Sync);
let err = h.set_method(Method::Journal);
assert!(err.is_err());
if let Err(Error::UnsupportedMethod { method }) = err {
assert_eq!(method, "journal");
} else {
panic!("expected UnsupportedMethod");
}
}
#[test]
fn test_use_direct_reflects_method() {
let h = Handle::new_raw(
Method::Direct,
Method::Direct,
None,
Mode::Dev,
512,
Pipeline::new(PipelineConfig::DEFAULT),
default_pool_config(),
128,
);
assert!(h.use_direct());
let h2 = make_handle(Method::Sync);
assert!(!h2.use_direct());
}
#[test]
fn test_resolve_path_no_root_passthrough() {
let h = make_handle(Method::Sync);
let p = PathBuf::from("some/relative/path");
assert_eq!(h.resolve_path(&p).expect("resolve"), p);
}
#[test]
fn test_resolve_path_with_root_joins() {
let root = std::env::temp_dir();
let h = Handle::new_raw(
Method::Sync,
Method::Sync,
Some(root.clone()),
Mode::Dev,
512,
Pipeline::new(PipelineConfig::DEFAULT),
default_pool_config(),
128,
);
let resolved = h
.resolve_path(Path::new("subdir/file.txt"))
.expect("resolve");
assert!(resolved.starts_with(&root));
}
#[test]
fn test_resolve_path_escape_is_rejected() {
let root = std::env::temp_dir().join("jail");
let h = Handle::new_raw(
Method::Sync,
Method::Sync,
Some(root),
Mode::Dev,
512,
Pipeline::new(PipelineConfig::DEFAULT),
default_pool_config(),
128,
);
let result = h.resolve_path(Path::new("../../etc/passwd"));
assert!(result.is_err(), "path escape must be rejected");
}
#[test]
fn test_gen_temp_path_has_fsys_prefix() {
let path = PathBuf::from("/tmp/myfile.db");
let tmp = Handle::gen_temp_path(&path);
let name = tmp.file_name().unwrap().to_string_lossy();
assert!(name.starts_with(".fsys-tmp-"), "got: {}", name);
}
#[test]
fn test_sector_size_accessor() {
let h = Handle::new_raw(
Method::Sync,
Method::Sync,
None,
Mode::Dev,
4096,
Pipeline::new(PipelineConfig::DEFAULT),
default_pool_config(),
128,
);
assert_eq!(h.sector_size(), 4096);
}
}