use std::collections::HashMap;
use std::fs;
use std::io;
use std::io::Write as _;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex, OnceLock};
use std::time::{SystemTime, UNIX_EPOCH};
use serde::{de::DeserializeOwned, Serialize};
use tempfile::NamedTempFile;
use crate::backend::StorageBackend;
use crate::codec::{Codec, JsonCodec};
use crate::error::StorageError;
use crate::memory::{
append_log_storage, kv_storage, snapshot_storage, AppendLogStorage, AppendLogStorageOptions,
KvStorage, KvStorageOptions, SnapshotStorage, SnapshotStorageOptions,
};
const FILE_SUFFIX: &str = ".bin";
const HEX_LOWER: &[u8; 16] = b"0123456789abcdef";
#[derive(Debug)]
pub struct FileBackend {
dir: PathBuf,
name: String,
include_hidden: bool,
case_state: OnceLock<CaseState>,
case_override: Option<bool>,
}
#[derive(Debug)]
enum CaseState {
Sensitive,
Insensitive {
seen: Mutex<HashMap<String, String>>,
},
}
impl FileBackend {
#[must_use]
pub fn new(dir: impl AsRef<Path>) -> Self {
let dir = dir.as_ref().to_path_buf();
let name = format!("file:{}", dir.display());
Self {
dir,
name,
include_hidden: false,
case_state: OnceLock::new(),
case_override: None,
}
}
#[must_use]
pub fn with_include_hidden(mut self, include: bool) -> Self {
self.include_hidden = include;
self
}
#[cfg(any(test, feature = "test-hooks"))]
#[doc(hidden)]
#[must_use]
pub fn with_case_insensitive(mut self, forced: bool) -> Self {
self.case_override = Some(forced);
self
}
#[must_use]
pub fn dir(&self) -> &Path {
&self.dir
}
#[must_use]
pub fn include_hidden(&self) -> bool {
self.include_hidden
}
fn path_for(&self, key: &str) -> PathBuf {
let mut filename = encode_key_to_filename(key);
filename.push_str(FILE_SUFFIX);
self.dir.join(filename)
}
fn filename_for(key: &str) -> String {
let mut filename = encode_key_to_filename(key);
filename.push_str(FILE_SUFFIX);
filename
}
fn ensure_case_state(&self) -> &CaseState {
self.case_state.get_or_init(|| {
if let Some(forced) = self.case_override {
return if forced {
CaseState::Insensitive {
seen: Mutex::new(HashMap::new()),
}
} else {
CaseState::Sensitive
};
}
match probe_case_sensitivity(&self.dir) {
Some(true) => CaseState::Insensitive {
seen: Mutex::new(HashMap::new()),
},
Some(false) | None => CaseState::Sensitive,
}
})
}
fn check_case_collision(&self, key: &str) -> Result<(), StorageError> {
let CaseState::Insensitive { seen } = self.ensure_case_state() else {
return Ok(());
};
let filename = Self::filename_for(key);
let folded = filename.to_ascii_lowercase();
let mut guard = seen.lock().expect("case-collision tracker poisoned");
if let Some(existing) = guard.get(&folded) {
if existing != &filename {
return Err(StorageError::BackendError {
message: format!(
"case-insensitive filesystem collision: existing key \
file {existing:?} and new key file {filename:?} \
(encoded from {key:?}) map to the same on-disk path \
when case-folded; FileBackend rejects to prevent \
silent overwrite",
),
source: None,
});
}
} else {
guard.insert(folded, filename);
}
Ok(())
}
fn release_case_slot(&self, key: &str) {
let Some(CaseState::Insensitive { seen }) = self.case_state.get() else {
return;
};
let filename = Self::filename_for(key);
let folded = filename.to_ascii_lowercase();
if let Ok(mut guard) = seen.lock() {
if guard.get(&folded) == Some(&filename) {
guard.remove(&folded);
}
}
}
}
static PROBE_NONCE: AtomicU64 = AtomicU64::new(0);
fn sweep_orphan_probe_files(dir: &Path) {
use std::collections::HashSet;
static SWEPT: OnceLock<Mutex<HashSet<PathBuf>>> = OnceLock::new();
let swept = SWEPT.get_or_init(|| Mutex::new(HashSet::new()));
let Ok(mut guard) = swept.lock() else {
return; };
if guard.contains(dir) {
return;
}
if let Ok(entries) = fs::read_dir(dir) {
for entry in entries.flatten() {
let name = entry.file_name();
let Some(name_str) = name.to_str() else {
continue;
};
if name_str.starts_with(".gr-case-probe-") || name_str.starts_with(".GR-CASE-PROBE-") {
let _ = fs::remove_file(entry.path());
}
}
}
guard.insert(dir.to_path_buf());
}
fn probe_case_sensitivity(dir: &Path) -> Option<bool> {
fs::create_dir_all(dir).ok()?;
sweep_orphan_probe_files(dir);
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.ok()?
.as_nanos();
let pid = std::process::id();
let nonce = PROBE_NONCE.fetch_add(1, Ordering::Relaxed);
let lower_name = format!(".gr-case-probe-{pid}-{nanos}-{nonce}-a.bin");
let upper_name = lower_name.to_ascii_uppercase();
let lower_path = dir.join(&lower_name);
let upper_path = dir.join(&upper_name);
let _ = fs::write(&lower_path, b"probe");
let result = fs::metadata(&upper_path).is_ok();
let _ = fs::remove_file(&lower_path);
let _ = fs::remove_file(&upper_path);
Some(result)
}
#[must_use]
pub fn file_backend(dir: impl AsRef<Path>) -> Arc<FileBackend> {
Arc::new(FileBackend::new(dir))
}
impl StorageBackend for FileBackend {
fn name(&self) -> &str {
&self.name
}
fn read(&self, key: &str) -> Result<Option<Vec<u8>>, StorageError> {
match fs::read(self.path_for(key)) {
Ok(bytes) => Ok(Some(bytes)),
Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(None),
Err(e) => Err(io_error("read", &self.dir, e)),
}
}
fn write(&self, key: &str, bytes: &[u8]) -> Result<(), StorageError> {
fs::create_dir_all(&self.dir).map_err(|e| io_error("mkdir", &self.dir, e))?;
self.check_case_collision(key)?;
let target = self.path_for(key);
let mut tmp =
NamedTempFile::new_in(&self.dir).map_err(|e| io_error("tempfile", &self.dir, e))?;
tmp.write_all(bytes)
.map_err(|e| io_error("write tmp", &self.dir, e))?;
tmp.persist(&target)
.map_err(|e| io_error("rename", &self.dir, e.error))?;
Ok(())
}
fn delete(&self, key: &str) -> Result<(), StorageError> {
match fs::remove_file(self.path_for(key)) {
Ok(()) => {
self.release_case_slot(key);
Ok(())
}
Err(e) if e.kind() == io::ErrorKind::NotFound => {
self.release_case_slot(key);
Ok(())
}
Err(e) => Err(io_error("delete", &self.dir, e)),
}
}
fn list(&self, prefix: &str) -> Result<Vec<String>, StorageError> {
let entries = match fs::read_dir(&self.dir) {
Ok(e) => e,
Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(Vec::new()),
Err(e) => return Err(io_error("list", &self.dir, e)),
};
let mut keys = Vec::new();
for entry in entries {
let entry = entry.map_err(|e| io_error("list-entry", &self.dir, e))?;
let raw = entry.file_name();
let Some(name) = raw.to_str() else { continue };
if !self.include_hidden && name.starts_with('.') {
continue;
}
let Some(key) = decode_filename_to_key(name) else {
continue;
};
if !prefix.is_empty() && !key.starts_with(prefix) {
continue;
}
keys.push(key);
}
keys.sort();
Ok(keys)
}
}
fn io_error(op: &str, dir: &Path, source: io::Error) -> StorageError {
StorageError::BackendError {
message: format!("file backend {op} failed at {}: {source}", dir.display()),
source: Some(Box::new(source)),
}
}
fn encode_key_to_filename(key: &str) -> String {
let mut out = String::with_capacity(key.len());
let mut buf = [0u8; 4];
for ch in key.chars() {
if ch.is_ascii_alphanumeric() || ch == '_' || ch == '-' {
out.push(ch);
continue;
}
for &byte in ch.encode_utf8(&mut buf).as_bytes() {
out.push('%');
out.push(HEX_LOWER[(byte >> 4) as usize] as char);
out.push(HEX_LOWER[(byte & 0x0F) as usize] as char);
}
}
out
}
fn decode_filename_to_key(filename: &str) -> Option<String> {
let stem = filename.strip_suffix(FILE_SUFFIX)?;
let chars: Vec<char> = stem.chars().collect();
let mut bytes: Vec<u8> = Vec::with_capacity(chars.len());
let mut i = 0;
while i < chars.len() {
let ch = chars[i];
if ch == '%' && i + 2 < chars.len() {
if let (Some(hi), Some(lo)) = (nibble(chars[i + 1]), nibble(chars[i + 2])) {
bytes.push((hi << 4) | lo);
i += 3;
continue;
}
}
if !ch.is_ascii() {
return None;
}
bytes.push(ch as u8);
i += 1;
}
String::from_utf8(bytes).ok()
}
fn nibble(c: char) -> Option<u8> {
c.to_digit(16).and_then(|d| u8::try_from(d).ok())
}
#[must_use]
pub fn file_snapshot<T, C>(
dir: impl AsRef<Path>,
opts: SnapshotStorageOptions<T, C>,
) -> SnapshotStorage<FileBackend, T, C>
where
T: Send + Sync + 'static,
C: Codec<T>,
{
snapshot_storage(Arc::new(FileBackend::new(dir)), opts)
}
#[must_use]
pub fn file_snapshot_default<T>(dir: impl AsRef<Path>) -> SnapshotStorage<FileBackend, T, JsonCodec>
where
T: Serialize + DeserializeOwned + Send + Sync + 'static,
{
file_snapshot(dir, SnapshotStorageOptions::default())
}
#[must_use]
pub fn file_append_log<T, C>(
dir: impl AsRef<Path>,
opts: AppendLogStorageOptions<T, C>,
) -> AppendLogStorage<FileBackend, T, C>
where
T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
C: Codec<Vec<T>>,
{
append_log_storage(Arc::new(FileBackend::new(dir)), opts)
}
#[must_use]
pub fn file_append_log_default<T>(
dir: impl AsRef<Path>,
) -> AppendLogStorage<FileBackend, T, JsonCodec>
where
T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
{
file_append_log(dir, AppendLogStorageOptions::default())
}
#[must_use]
pub fn file_kv<T, C>(
dir: impl AsRef<Path>,
opts: KvStorageOptions<T, C>,
) -> KvStorage<FileBackend, T, C>
where
T: Send + Sync + 'static,
C: Codec<T>,
{
kv_storage(Arc::new(FileBackend::new(dir)), opts)
}
#[must_use]
pub fn file_kv_default<T>(dir: impl AsRef<Path>) -> KvStorage<FileBackend, T, JsonCodec>
where
T: Serialize + DeserializeOwned + Send + Sync + 'static,
{
file_kv(dir, KvStorageOptions::default())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn encode_alphanumeric_passthrough() {
assert_eq!(encode_key_to_filename("abcXYZ-_09"), "abcXYZ-_09");
}
#[test]
fn encode_special_chars_percent_escape() {
assert_eq!(
encode_key_to_filename("app/with:slashes"),
"app%2fwith%3aslashes"
);
}
#[test]
fn encode_non_ascii_two_byte_utf8() {
assert_eq!(encode_key_to_filename("café"), "caf%c3%a9");
}
#[test]
fn encode_non_ascii_three_byte_utf8() {
assert_eq!(encode_key_to_filename("€100"), "%e2%82%ac100");
}
#[test]
fn encode_emoji_four_byte_utf8() {
assert_eq!(encode_key_to_filename("👋"), "%f0%9f%91%8b");
}
#[test]
fn encode_empty_key() {
assert_eq!(encode_key_to_filename(""), "");
}
#[test]
fn decode_round_trip_covers_canonical_set() {
for key in [
"simple",
"app/with:slashes",
"café",
"€100",
"👋 hello",
"a-b_c",
"",
] {
let filename = format!("{}.bin", encode_key_to_filename(key));
assert_eq!(
decode_filename_to_key(&filename).as_deref(),
Some(key),
"round-trip failed for {key:?}",
);
}
}
#[test]
fn decode_rejects_non_bin_suffix() {
assert!(decode_filename_to_key("foo.txt").is_none());
assert!(decode_filename_to_key("foo").is_none());
assert!(decode_filename_to_key(".bin").is_some()); }
#[test]
fn decode_truncated_percent_escape_treated_literally() {
assert_eq!(
decode_filename_to_key("abc%5.bin").as_deref(),
Some("abc%5")
);
}
#[test]
fn decode_invalid_hex_treated_literally() {
assert_eq!(
decode_filename_to_key("abc%5z.bin").as_deref(),
Some("abc%5z")
);
}
#[test]
fn decode_uppercase_hex_accepted() {
assert_eq!(
decode_filename_to_key("caf%C3%A9.bin").as_deref(),
Some("café")
);
}
#[test]
fn case_insensitive_rejects_case_divergent_second_write() {
let dir = tempfile::tempdir().expect("tempdir");
let backend = FileBackend::new(dir.path()).with_case_insensitive(true);
backend
.write("Foo", b"first")
.expect("first write must succeed");
let err = backend
.write("foo", b"second")
.expect_err("case-divergent second write must reject");
let StorageError::BackendError { message, .. } = err else {
panic!("expected StorageError::BackendError, got: {err:?}");
};
assert!(
message.contains("case-insensitive filesystem collision"),
"diagnostic must label the failure class, got: {message}"
);
assert!(
message.contains("Foo.bin") && message.contains("foo.bin"),
"diagnostic must name both colliding encoded filenames, got: {message}"
);
}
#[test]
fn case_insensitive_same_casing_overwrites() {
let dir = tempfile::tempdir().expect("tempdir");
let backend = FileBackend::new(dir.path()).with_case_insensitive(true);
backend.write("Foo", b"first").expect("first write");
backend
.write("Foo", b"second")
.expect("same-casing overwrite must succeed");
let read = backend.read("Foo").expect("read").expect("present");
assert_eq!(read, b"second");
}
#[test]
fn case_insensitive_delete_releases_slot() {
let dir = tempfile::tempdir().expect("tempdir");
let backend = FileBackend::new(dir.path()).with_case_insensitive(true);
backend.write("Foo", b"first").expect("write Foo");
backend.delete("Foo").expect("delete Foo");
backend.write("foo", b"new").expect("post-delete write foo");
let read = backend.read("foo").expect("read foo").expect("present");
assert_eq!(read, b"new");
}
#[test]
fn case_sensitive_allows_case_divergent_writes() {
let dir = tempfile::tempdir().expect("tempdir");
let backend = FileBackend::new(dir.path()).with_case_insensitive(false);
backend.write("Foo", b"first").expect("write Foo");
backend
.write("foo", b"second")
.expect("forced-sensitive backend must not reject case-divergent keys");
}
#[test]
fn decode_rejects_non_ascii_outside_escapes() {
assert!(decode_filename_to_key("café.bin").is_none());
}
#[test]
fn nibble_validates_hex_set() {
for c in ['0', '5', '9', 'a', 'f', 'A', 'F'] {
assert!(nibble(c).is_some(), "{c} should be a hex digit");
}
for c in ['g', 'G', '/', '@', '\u{00e9}'] {
assert!(nibble(c).is_none(), "{c} should not be a hex digit");
}
}
}