use std::path::Path;
use std::sync::atomic::{AtomicU64, Ordering};
static TMP_COUNTER: AtomicU64 = AtomicU64::new(0);
fn with_dir_lock<T>(
parent: &Path,
f: impl FnOnce() -> Result<T, crate::sink::HyphaError>,
) -> Result<T, crate::sink::HyphaError> {
use crate::sink::HyphaError;
use fs2::FileExt;
std::fs::create_dir_all(parent).map_err(|e| {
HyphaError::new(
"cache_write_failed",
format!("Failed to create directory: {}", e),
)
})?;
let lock_path = parent.join(".lock");
let lock_file = std::fs::OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(&lock_path)
.map_err(|e| {
HyphaError::new(
"cache_write_failed",
format!("Failed to open lock file: {}", e),
)
})?;
lock_file.lock_exclusive().map_err(|e| {
HyphaError::new(
"cache_write_failed",
format!("Failed to acquire lock: {}", e),
)
})?;
let result = f();
let _ = lock_file.unlock();
result
}
pub(super) fn locked_write_file(path: &Path, content: &str) -> Result<(), crate::sink::HyphaError> {
use crate::sink::HyphaError;
let parent = path.parent().ok_or_else(|| {
HyphaError::new("cache_write_failed", "Cannot determine parent directory")
})?;
with_dir_lock(parent, || atomic_write_file(path, content))
}
pub(super) fn locked_update_file(
path: &Path,
update: impl FnOnce(Option<String>) -> Result<String, crate::sink::HyphaError>,
) -> Result<(), crate::sink::HyphaError> {
use crate::sink::HyphaError;
let parent = path.parent().ok_or_else(|| {
HyphaError::new("cache_write_failed", "Cannot determine parent directory")
})?;
with_dir_lock(parent, || {
let existing = std::fs::read_to_string(path).ok();
let content = update(existing)?;
atomic_write_file(path, &content)
})
}
fn atomic_write_file(path: &Path, content: &str) -> Result<(), crate::sink::HyphaError> {
use crate::sink::HyphaError;
use std::io::Write;
let parent = path.parent().ok_or_else(|| {
HyphaError::new("cache_write_failed", "Cannot determine parent directory")
})?;
let file_name = path
.file_name()
.and_then(|name| name.to_str())
.unwrap_or("tmp");
let tmp_path = parent.join(format!(
".{}.tmp.{}.{}",
file_name,
std::process::id(),
TMP_COUNTER.fetch_add(1, Ordering::Relaxed)
));
let mut f = std::fs::OpenOptions::new()
.create_new(true)
.write(true)
.open(&tmp_path)
.map_err(|e| {
HyphaError::new(
"cache_write_failed",
format!("Failed to create temp file: {}", e),
)
})?;
f.write_all(content.as_bytes()).map_err(|e| {
let _ = std::fs::remove_file(&tmp_path);
HyphaError::new(
"cache_write_failed",
format!("Failed to write temp file: {}", e),
)
})?;
f.sync_all().map_err(|e| {
let _ = std::fs::remove_file(&tmp_path);
HyphaError::new(
"cache_write_failed",
format!("Failed to sync temp file: {}", e),
)
})?;
drop(f);
std::fs::rename(&tmp_path, path).map_err(|e| {
let _ = std::fs::remove_file(&tmp_path);
HyphaError::new(
"cache_write_failed",
format!("Failed to rename temp file: {}", e),
)
})?;
sync_parent_dir(parent);
Ok(())
}
fn sync_parent_dir(dir: &Path) {
if let Ok(handle) = std::fs::File::open(dir) {
let _ = handle.sync_all();
}
}
pub(super) fn read_spore_metadata(manifest_path: &Path) -> (String, String) {
if manifest_path.exists() {
if let Ok(content) = std::fs::read_to_string(manifest_path) {
if let Ok(manifest) = serde_json::from_str::<substrate::Spore>(&content) {
return (manifest.capsule.core.name, manifest.capsule.core.synopsis);
}
}
}
("unknown".to_string(), String::new())
}
pub(super) fn dir_size(path: &Path) -> u64 {
let mut size = 0;
let mut stack = vec![path.to_path_buf()];
while let Some(dir) = stack.pop() {
if let Ok(entries) = std::fs::read_dir(&dir) {
for entry in entries.filter_map(|e| e.ok()) {
let path = entry.path();
if path.is_file() {
size += entry.metadata().map(|m| m.len()).unwrap_or(0);
} else if path.is_dir() {
stack.push(path);
}
}
}
}
size
}