use crate::batch::Batch;
use crate::error::BatchError;
use crate::method::Method;
use crate::path::Mode;
use crate::pipeline::{BatchOp, HandleSnapshot, Pipeline};
use crate::{Error, Result};
use std::path::{Path, PathBuf};
use std::sync::atomic::AtomicU64;
use std::sync::atomic::{AtomicU8, Ordering};
static WRITE_COUNTER: AtomicU64 = AtomicU64::new(0);
pub struct Handle {
configured_method: AtomicU8,
active_method: AtomicU8,
root: Option<PathBuf>,
mode: Mode,
sector_size: u32,
pipeline: Pipeline,
}
impl Handle {
pub(crate) fn new_raw(
configured_method: Method,
active_method: Method,
root: Option<PathBuf>,
mode: Mode,
sector_size: u32,
pipeline: Pipeline,
) -> Self {
Self {
configured_method: AtomicU8::new(configured_method.to_u8()),
active_method: AtomicU8::new(active_method.to_u8()),
root,
mode,
sector_size,
pipeline,
}
}
#[must_use]
pub fn method(&self) -> Method {
Method::from_u8(self.configured_method.load(Ordering::Relaxed))
}
#[must_use]
pub fn active_method(&self) -> Method {
Method::from_u8(self.active_method.load(Ordering::Relaxed))
}
pub fn set_method(&self, method: Method) -> Result<()> {
if method.is_reserved() {
return Err(Error::UnsupportedMethod {
method: method.as_str(),
});
}
let resolved = method.resolve();
self.configured_method
.store(method.to_u8(), Ordering::Relaxed);
self.active_method
.store(resolved.to_u8(), Ordering::Relaxed);
Ok(())
}
#[must_use]
pub fn root(&self) -> Option<&Path> {
self.root.as_deref()
}
#[must_use]
pub fn mode(&self) -> Mode {
self.mode
}
#[must_use]
pub fn sector_size(&self) -> u32 {
self.sector_size
}
pub(crate) fn update_active_method(&self, method: Method) {
self.active_method.store(method.to_u8(), Ordering::Relaxed);
}
pub(crate) fn use_direct(&self) -> bool {
self.active_method() == Method::Direct
}
pub(crate) fn resolve_path(&self, path: &Path) -> Result<PathBuf> {
let Some(root) = &self.root else {
return Ok(path.to_owned());
};
let candidate = if path.is_absolute() {
path.to_owned()
} else {
root.join(path)
};
let mut resolved = PathBuf::new();
for component in candidate.components() {
use std::path::Component;
match component {
Component::Prefix(p) => {
resolved.push(p.as_os_str());
}
Component::RootDir => {
resolved.push(component);
}
Component::CurDir => {
}
Component::ParentDir => {
if !resolved.pop() {
return Err(Error::InvalidPath {
path: path.to_owned(),
reason: "path escapes the handle root".into(),
});
}
}
Component::Normal(n) => {
resolved.push(n);
}
}
}
if !resolved.starts_with(root) {
return Err(Error::InvalidPath {
path: path.to_owned(),
reason: "path escapes the handle root".into(),
});
}
Ok(resolved)
}
pub(crate) fn gen_temp_path(path: &Path) -> PathBuf {
let n = WRITE_COUNTER.fetch_add(1, Ordering::Relaxed);
let parent = path.parent().unwrap_or_else(|| Path::new("."));
let stem = path
.file_name()
.map(|n| n.to_string_lossy().into_owned())
.unwrap_or_default();
let name = format!(".fsys-tmp-{}.{}", n, stem);
parent.join(name)
}
pub fn write_batch<P: AsRef<Path>>(
&self,
batch: &[(P, &[u8])],
) -> std::result::Result<(), BatchError> {
let mut ops: Vec<BatchOp> = Vec::with_capacity(batch.len());
for (i, (path, data)) in batch.iter().enumerate() {
let resolved = self
.resolve_path(path.as_ref())
.map_err(|e| pre_submit_err(i, e))?;
ops.push(BatchOp::Write {
path: resolved,
data: data.to_vec(),
});
}
self.submit_batch(ops)
}
pub fn delete_batch<P: AsRef<Path>>(&self, batch: &[P]) -> std::result::Result<(), BatchError> {
let mut ops: Vec<BatchOp> = Vec::with_capacity(batch.len());
for (i, path) in batch.iter().enumerate() {
let resolved = self
.resolve_path(path.as_ref())
.map_err(|e| pre_submit_err(i, e))?;
ops.push(BatchOp::Delete { path: resolved });
}
self.submit_batch(ops)
}
pub fn copy_batch<P: AsRef<Path>, Q: AsRef<Path>>(
&self,
batch: &[(P, Q)],
) -> std::result::Result<(), BatchError> {
let mut ops: Vec<BatchOp> = Vec::with_capacity(batch.len());
for (i, (src, dst)) in batch.iter().enumerate() {
let resolved_src = self
.resolve_path(src.as_ref())
.map_err(|e| pre_submit_err(i, e))?;
let resolved_dst = self
.resolve_path(dst.as_ref())
.map_err(|e| pre_submit_err(i, e))?;
ops.push(BatchOp::Copy {
src: resolved_src,
dst: resolved_dst,
});
}
self.submit_batch(ops)
}
pub fn batch(&self) -> Batch<'_> {
Batch::new(self)
}
pub(crate) fn snapshot(&self) -> HandleSnapshot {
HandleSnapshot {
method: self.active_method(),
sector_size: self.sector_size,
use_direct: self.use_direct(),
}
}
pub(crate) fn submit_batch(&self, ops: Vec<BatchOp>) -> std::result::Result<(), BatchError> {
self.pipeline.submit(ops, self.snapshot())
}
}
fn pre_submit_err(index: usize, e: Error) -> BatchError {
BatchError {
failed_at: index,
completed: 0,
source: Box::new(e),
}
}
const _: () = {
#[allow(dead_code)]
fn assert_send<T: Send>() {}
#[allow(dead_code)]
fn assert_sync<T: Sync>() {}
#[allow(dead_code)]
fn check() {
assert_send::<Handle>();
assert_sync::<Handle>();
}
};
#[cfg(test)]
mod tests {
use super::*;
use crate::method::Method;
use crate::path::Mode;
use crate::pipeline::PipelineConfig;
fn make_handle(method: Method) -> Handle {
Handle::new_raw(
method,
method.resolve(),
None,
Mode::Dev,
512,
Pipeline::new(PipelineConfig::DEFAULT),
)
}
#[test]
fn test_method_accessor_roundtrip() {
let h = make_handle(Method::Sync);
assert_eq!(h.method(), Method::Sync);
}
#[test]
fn test_active_method_reflects_resolved() {
let h = make_handle(Method::Auto);
let active = h.active_method();
assert_ne!(active, Method::Auto, "active method must be concrete");
}
#[test]
fn test_set_method_updates_active() {
let h = make_handle(Method::Sync);
h.set_method(Method::Data).expect("set_method");
assert_eq!(h.method(), Method::Data);
}
#[test]
fn test_set_reserved_method_returns_error() {
let h = make_handle(Method::Sync);
let err = h.set_method(Method::Mmap);
assert!(err.is_err());
if let Err(Error::UnsupportedMethod { method }) = err {
assert_eq!(method, "mmap");
} else {
panic!("expected UnsupportedMethod");
}
}
#[test]
fn test_use_direct_reflects_method() {
let h = Handle::new_raw(
Method::Direct,
Method::Direct,
None,
Mode::Dev,
512,
Pipeline::new(PipelineConfig::DEFAULT),
);
assert!(h.use_direct());
let h2 = make_handle(Method::Sync);
assert!(!h2.use_direct());
}
#[test]
fn test_resolve_path_no_root_passthrough() {
let h = make_handle(Method::Sync);
let p = PathBuf::from("some/relative/path");
assert_eq!(h.resolve_path(&p).expect("resolve"), p);
}
#[test]
fn test_resolve_path_with_root_joins() {
let root = std::env::temp_dir();
let h = Handle::new_raw(
Method::Sync,
Method::Sync,
Some(root.clone()),
Mode::Dev,
512,
Pipeline::new(PipelineConfig::DEFAULT),
);
let resolved = h
.resolve_path(Path::new("subdir/file.txt"))
.expect("resolve");
assert!(resolved.starts_with(&root));
}
#[test]
fn test_resolve_path_escape_is_rejected() {
let root = std::env::temp_dir().join("jail");
let h = Handle::new_raw(
Method::Sync,
Method::Sync,
Some(root),
Mode::Dev,
512,
Pipeline::new(PipelineConfig::DEFAULT),
);
let result = h.resolve_path(Path::new("../../etc/passwd"));
assert!(result.is_err(), "path escape must be rejected");
}
#[test]
fn test_gen_temp_path_has_fsys_prefix() {
let path = PathBuf::from("/tmp/myfile.db");
let tmp = Handle::gen_temp_path(&path);
let name = tmp.file_name().unwrap().to_string_lossy();
assert!(name.starts_with(".fsys-tmp-"), "got: {}", name);
}
#[test]
fn test_sector_size_accessor() {
let h = Handle::new_raw(
Method::Sync,
Method::Sync,
None,
Mode::Dev,
4096,
Pipeline::new(PipelineConfig::DEFAULT),
);
assert_eq!(h.sector_size(), 4096);
}
}