use crate::error::{PersistenceError, PersistenceResult};
use std::io::{Read, Write};
use std::path::PathBuf;
pub fn sync_file<D: Directory + ?Sized>(dir: &D, path: &str) -> PersistenceResult<()> {
let Some(p) = dir.file_path(path) else {
return Err(PersistenceError::NotSupported(
"sync_file requires Directory::file_path()".into(),
));
};
let f = std::fs::OpenOptions::new().read(true).open(&p)?;
f.sync_data()?;
Ok(())
}
pub fn sync_parent_dir<D: Directory + ?Sized>(dir: &D, path: &str) -> PersistenceResult<()> {
let Some(p) = dir.file_path(path) else {
return Err(PersistenceError::NotSupported(
"sync_parent_dir requires Directory::file_path()".into(),
));
};
sync_parent_of_path(&p)
}
pub fn sync_parent_of_path(path: &std::path::Path) -> PersistenceResult<()> {
let Some(parent) = path.parent() else {
return Err(PersistenceError::InvalidConfig(format!(
"path has no parent directory: {path:?}"
)));
};
let f = std::fs::File::open(parent)?;
f.sync_all()?;
Ok(())
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum FlushPolicy {
PerAppend,
EveryN(usize),
Interval(std::time::Duration),
Manual,
}
pub trait Directory: Send + Sync {
fn create_file(&self, path: &str) -> PersistenceResult<Box<dyn Write + Send>>;
fn open_file(&self, path: &str) -> PersistenceResult<Box<dyn Read + Send>>;
fn exists(&self, path: &str) -> bool;
fn delete(&self, path: &str) -> PersistenceResult<()>;
fn atomic_rename(&self, from: &str, to: &str) -> PersistenceResult<()>;
fn create_dir_all(&self, path: &str) -> PersistenceResult<()>;
fn list_dir(&self, path: &str) -> PersistenceResult<Vec<String>>;
fn append_file(&self, path: &str) -> PersistenceResult<Box<dyn Write + Send>>;
fn atomic_write(&self, path: &str, data: &[u8]) -> PersistenceResult<()>;
fn file_path(&self, path: &str) -> Option<PathBuf>;
fn durable_sync_file(&self, path: &str) -> PersistenceResult<()> {
sync_file(self, path)
}
fn durable_sync_parent_dir(&self, path: &str) -> PersistenceResult<()> {
sync_parent_dir(self, path)
}
fn atomic_rename_durable(&self, from: &str, to: &str) -> PersistenceResult<()> {
let from_path = match self.file_path(from) {
Some(p) => p,
None => {
return Err(PersistenceError::NotSupported(
"atomic_rename_durable requires Directory::file_path()".into(),
));
}
};
let to_path = match self.file_path(to) {
Some(p) => p,
None => {
return Err(PersistenceError::NotSupported(
"atomic_rename_durable requires Directory::file_path()".into(),
));
}
};
self.atomic_rename(from, to)?;
let from_parent = from_path.parent();
let to_parent = to_path.parent();
if from_parent != to_parent {
self.durable_sync_parent_dir(from)?;
}
self.durable_sync_parent_dir(to)?;
Ok(())
}
fn atomic_write_durable(&self, path: &str, data: &[u8]) -> PersistenceResult<()> {
if self.file_path(path).is_none() {
return Err(PersistenceError::NotSupported(
"atomic_write_durable requires Directory::file_path()".into(),
));
}
let tmp = format!("{path}.tmp");
if let Err(e) = (|| -> PersistenceResult<()> {
let mut w = self.create_file(&tmp)?;
w.write_all(data)?;
w.flush()?;
Ok(())
})() {
let _ = self.delete(&tmp);
return Err(e);
}
if let Err(e) = self.durable_sync_file(&tmp) {
let _ = self.delete(&tmp);
return Err(e);
}
if let Err(e) = self.atomic_rename_durable(&tmp, path) {
let _ = self.delete(&tmp);
return Err(e);
}
Ok(())
}
}
pub struct FsDirectory {
root: PathBuf,
}
impl FsDirectory {
pub fn new(root: impl Into<PathBuf>) -> PersistenceResult<Self> {
let root = root.into();
std::fs::create_dir_all(&root)?;
Ok(Self { root })
}
pub fn arc(
root: impl Into<std::path::PathBuf>,
) -> PersistenceResult<std::sync::Arc<dyn Directory>> {
Ok(std::sync::Arc::new(Self::new(root)?))
}
fn resolve_path(&self, path: &str) -> PersistenceResult<PathBuf> {
for component in std::path::Path::new(path).components() {
match component {
std::path::Component::ParentDir
| std::path::Component::RootDir
| std::path::Component::Prefix(_) => {
return Err(PersistenceError::InvalidConfig(format!(
"path must not contain '..', absolute, or prefix components: {path}"
)));
}
_ => {}
}
}
Ok(self.root.join(path))
}
}
impl Directory for FsDirectory {
fn create_file(&self, path: &str) -> PersistenceResult<Box<dyn Write + Send>> {
let full_path = self.resolve_path(path)?;
if let Some(parent) = full_path.parent() {
std::fs::create_dir_all(parent)?;
}
Ok(Box::new(std::fs::File::create(full_path)?))
}
fn open_file(&self, path: &str) -> PersistenceResult<Box<dyn Read + Send>> {
let full_path = self.resolve_path(path)?;
if !full_path.exists() {
return Err(PersistenceError::NotFound(full_path.display().to_string()));
}
Ok(Box::new(std::fs::File::open(full_path)?))
}
fn exists(&self, path: &str) -> bool {
self.resolve_path(path).map(|p| p.exists()).unwrap_or(false)
}
fn delete(&self, path: &str) -> PersistenceResult<()> {
let full_path = self.resolve_path(path)?;
if full_path.is_dir() {
std::fs::remove_dir_all(full_path)?;
} else if full_path.exists() {
std::fs::remove_file(full_path)?;
}
Ok(())
}
fn atomic_rename(&self, from: &str, to: &str) -> PersistenceResult<()> {
let from_path = self.resolve_path(from)?;
let to_path = self.resolve_path(to)?;
if let Some(parent) = to_path.parent() {
std::fs::create_dir_all(parent)?;
}
std::fs::rename(from_path, to_path)?;
Ok(())
}
fn create_dir_all(&self, path: &str) -> PersistenceResult<()> {
std::fs::create_dir_all(self.resolve_path(path)?)?;
Ok(())
}
fn list_dir(&self, path: &str) -> PersistenceResult<Vec<String>> {
let full_path = self.resolve_path(path)?;
if !full_path.exists() {
return Ok(Vec::new());
}
let entries = std::fs::read_dir(full_path)?;
let mut out = Vec::new();
for entry in entries {
let entry = entry?;
out.push(entry.file_name().to_string_lossy().to_string());
}
out.sort();
Ok(out)
}
fn append_file(&self, path: &str) -> PersistenceResult<Box<dyn Write + Send>> {
let full_path = self.resolve_path(path)?;
if let Some(parent) = full_path.parent() {
std::fs::create_dir_all(parent)?;
}
let file = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(full_path)?;
Ok(Box::new(file))
}
fn atomic_write(&self, path: &str, data: &[u8]) -> PersistenceResult<()> {
let temp_path = format!("{path}.tmp");
let full_temp_path = self.resolve_path(&temp_path)?;
if let Some(parent) = full_temp_path.parent() {
std::fs::create_dir_all(parent)?;
}
if let Err(e) = (|| -> PersistenceResult<()> {
let mut temp_file = std::fs::File::create(&full_temp_path)?;
temp_file.write_all(data)?;
temp_file.sync_all()?;
Ok(())
})() {
let _ = std::fs::remove_file(&full_temp_path);
return Err(e);
}
let full_path = self.resolve_path(path)?;
if let Err(e) = std::fs::rename(&full_temp_path, &full_path) {
let _ = std::fs::remove_file(&full_temp_path);
return Err(e.into());
}
if let Some(parent) = full_path.parent() {
let parent_file = std::fs::File::open(parent)?;
parent_file.sync_all()?;
}
Ok(())
}
fn file_path(&self, path: &str) -> Option<PathBuf> {
self.resolve_path(path).ok()
}
}
#[derive(Clone, Default)]
pub struct MemoryDirectory {
files: std::sync::Arc<std::sync::RwLock<std::collections::HashMap<String, Vec<u8>>>>,
}
impl MemoryDirectory {
pub fn new() -> Self {
Self::default()
}
pub fn arc() -> std::sync::Arc<dyn Directory> {
std::sync::Arc::new(Self::new())
}
}
impl Directory for MemoryDirectory {
fn create_file(&self, path: &str) -> PersistenceResult<Box<dyn Write + Send>> {
self.files
.write()
.map_err(|_| PersistenceError::LockFailed {
resource: "memory directory".to_string(),
reason: "lock poisoned".to_string(),
})?
.insert(path.to_string(), Vec::new());
Ok(Box::new(MemoryInPlaceWriter {
files: self.files.clone(),
path: path.to_string(),
}))
}
fn open_file(&self, path: &str) -> PersistenceResult<Box<dyn Read + Send>> {
let files = self
.files
.read()
.map_err(|_| PersistenceError::LockFailed {
resource: "memory directory".to_string(),
reason: "lock poisoned".to_string(),
})?;
let data = files
.get(path)
.ok_or_else(|| PersistenceError::NotFound(path.to_string()))?
.clone();
Ok(Box::new(std::io::Cursor::new(data)))
}
fn exists(&self, path: &str) -> bool {
self.files
.read()
.map(|f| f.contains_key(path))
.unwrap_or(false)
}
fn delete(&self, path: &str) -> PersistenceResult<()> {
let mut files = self
.files
.write()
.map_err(|_| PersistenceError::LockFailed {
resource: "memory directory".to_string(),
reason: "lock poisoned".to_string(),
})?;
files.remove(path);
let prefix = format!("{path}/");
files.retain(|k, _| !k.starts_with(&prefix));
Ok(())
}
fn atomic_rename(&self, from: &str, to: &str) -> PersistenceResult<()> {
let mut files = self
.files
.write()
.map_err(|_| PersistenceError::LockFailed {
resource: "memory directory".to_string(),
reason: "lock poisoned".to_string(),
})?;
let data = files
.remove(from)
.ok_or_else(|| PersistenceError::NotFound(from.to_string()))?;
files.insert(to.to_string(), data);
Ok(())
}
fn create_dir_all(&self, _path: &str) -> PersistenceResult<()> {
Ok(())
}
fn list_dir(&self, path: &str) -> PersistenceResult<Vec<String>> {
let files = self
.files
.read()
.map_err(|_| PersistenceError::LockFailed {
resource: "memory directory".to_string(),
reason: "lock poisoned".to_string(),
})?;
let prefix = if path.is_empty() {
"".to_string()
} else {
format!("{path}/")
};
let result: std::collections::BTreeSet<String> = files
.keys()
.filter(|k| k.starts_with(&prefix))
.filter_map(|k| {
let rest = k.strip_prefix(&prefix).unwrap_or(k);
let first_component = rest.split('/').next().unwrap_or(rest);
if first_component.is_empty() {
None
} else {
Some(first_component.to_string())
}
})
.collect();
Ok(result.into_iter().collect())
}
fn append_file(&self, path: &str) -> PersistenceResult<Box<dyn Write + Send>> {
{
let mut files = self
.files
.write()
.map_err(|_| PersistenceError::LockFailed {
resource: "memory directory".to_string(),
reason: "lock poisoned".to_string(),
})?;
files.entry(path.to_string()).or_insert_with(Vec::new);
}
Ok(Box::new(MemoryInPlaceWriter {
files: self.files.clone(),
path: path.to_string(),
}))
}
fn atomic_write(&self, path: &str, data: &[u8]) -> PersistenceResult<()> {
let mut files = self
.files
.write()
.map_err(|_| PersistenceError::LockFailed {
resource: "memory directory".to_string(),
reason: "lock poisoned".to_string(),
})?;
files.insert(path.to_string(), data.to_vec());
Ok(())
}
fn file_path(&self, _path: &str) -> Option<PathBuf> {
None
}
}
struct MemoryInPlaceWriter {
files: std::sync::Arc<std::sync::RwLock<std::collections::HashMap<String, Vec<u8>>>>,
path: String,
}
impl Write for MemoryInPlaceWriter {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let mut files = self
.files
.write()
.map_err(|_| std::io::Error::other("lock poisoned"))?;
let entry = files.entry(self.path.clone()).or_insert_with(Vec::new);
entry.extend_from_slice(buf);
Ok(buf.len())
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[cfg(unix)]
#[test]
fn sync_parent_of_path_existing_parent_succeeds() {
let dir = tempfile::tempdir().expect("tempdir");
let path = dir.path().join("some_file.txt");
std::fs::write(&path, b"hi").expect("write");
sync_parent_of_path(&path).expect("sync_parent_of_path on existing parent");
}
#[cfg(unix)]
#[test]
fn sync_parent_of_path_missing_parent_errors() {
let dir = tempfile::tempdir().expect("tempdir");
let nonexistent = dir.path().join("definitely-not-a-real-subdir/file.txt");
assert!(sync_parent_of_path(&nonexistent).is_err());
}
#[test]
fn sync_parent_of_path_root_errors_with_invalid_config() {
let root = std::path::Path::new("/");
match sync_parent_of_path(root) {
Err(PersistenceError::InvalidConfig(_)) => {}
other => panic!("expected InvalidConfig for root path, got {other:?}"),
}
}
}