use std::cmp;
use std::collections::HashSet;
use std::ffi::OsString;
use std::fmt;
use std::fs;
use std::fs::OpenOptions;
use std::os::unix::ffi::OsStrExt;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use std::time::SystemTime;
use std::{collections::HashMap, pin::Pin};
use anyhow::Context as _;
use async_trait::async_trait;
use futures::StreamExt;
use notify::{Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher, event};
use parking_lot::Mutex;
use tokio_util::sync::CancellationToken;
use super::{Bucket, Key, KeyValue, Store, StoreError, StoreOutcome, WatchEvent};
const DEFAULT_TTL: Duration = Duration::from_secs(10);
const MIN_KEEP_ALIVE: Duration = Duration::from_secs(1);
const TEMP_FILE_PREFIX: &str = ".tmp_";
#[derive(Clone)]
pub struct FileStore {
cancel_token: CancellationToken,
root: PathBuf,
connection_id: u64,
active_dirs: Arc<Mutex<HashMap<PathBuf, Directory>>>,
}
impl FileStore {
pub(super) fn new<P: Into<PathBuf>>(cancel_token: CancellationToken, root_dir: P) -> Self {
let fs = FileStore {
cancel_token,
root: root_dir.into(),
connection_id: rand::random::<u64>(),
active_dirs: Arc::new(Mutex::new(HashMap::new())),
};
let c = fs.clone();
thread::spawn(move || c.expiry_thread());
fs
}
fn expiry_thread(&self) {
loop {
let ttl = self.shortest_ttl();
let keep_alive_interval = cmp::max(ttl / 3, MIN_KEEP_ALIVE);
if self.cancel_token.is_cancelled() {
break;
}
thread::sleep(keep_alive_interval);
if self.cancel_token.is_cancelled() {
break;
}
self.keep_alive();
if let Err(err) = self.delete_expired_files() {
tracing::error!(error = %err, "FileStore delete_expired_files");
}
}
}
fn shortest_ttl(&self) -> Duration {
let mut ttl = DEFAULT_TTL;
let active_dirs = self.active_dirs.lock().clone();
for (_, dir) in active_dirs {
ttl = cmp::min(ttl, dir.ttl);
}
tracing::trace!("FileStore expiry shortest ttl {ttl:?}");
ttl
}
fn keep_alive(&self) {
let active_dirs = self.active_dirs.lock().clone();
for (_, dir) in active_dirs {
dir.keep_alive();
}
}
fn delete_expired_files(&self) -> anyhow::Result<()> {
let active_dirs = self.active_dirs.lock().clone();
for (path, dir) in active_dirs {
dir.delete_expired_files()
.with_context(|| path.display().to_string())?;
}
Ok(())
}
}
#[async_trait]
impl Store for FileStore {
type Bucket = Directory;
async fn get_or_create_bucket(
&self,
bucket_name: &str,
ttl: Option<Duration>,
) -> Result<Self::Bucket, StoreError> {
let p = self.root.join(bucket_name);
if let Some(dir) = self.active_dirs.lock().get(&p) {
return Ok(dir.clone());
};
if p.exists() {
if !p.is_dir() {
return Err(StoreError::FilesystemError(
"Bucket name is not a directory".to_string(),
));
}
} else {
fs::create_dir_all(&p).map_err(to_fs_err)?;
}
let dir = Directory::new(self.root.clone(), p.clone(), ttl.unwrap_or(DEFAULT_TTL));
self.active_dirs.lock().insert(p, dir.clone());
Ok(dir)
}
async fn get_bucket(&self, bucket_name: &str) -> Result<Option<Self::Bucket>, StoreError> {
let p = self.root.join(bucket_name);
if let Some(dir) = self.active_dirs.lock().get(&p) {
return Ok(Some(dir.clone()));
};
if !p.exists() {
return Ok(None);
}
if !p.is_dir() {
return Err(StoreError::FilesystemError(
"Bucket name is not a directory".to_string(),
));
}
let dir = Directory::new(self.root.clone(), p.clone(), DEFAULT_TTL);
self.active_dirs.lock().insert(p, dir.clone());
Ok(Some(dir))
}
fn connection_id(&self) -> u64 {
self.connection_id
}
fn shutdown(&self) {
for (_, mut dir) in self.active_dirs.lock().drain() {
if let Err(err) = dir.delete_owned_files() {
tracing::error!(error = %err, %dir, "Failed shutdown delete of owned files");
}
}
}
}
#[derive(Clone)]
pub struct Directory {
root: PathBuf,
p: PathBuf,
ttl: Duration,
owned_files: Arc<Mutex<HashSet<PathBuf>>>,
}
impl Directory {
fn new(root: PathBuf, p: PathBuf, ttl: Duration) -> Self {
let canonical_root = root.canonicalize().unwrap_or_else(|_| root.clone());
if ttl < MIN_KEEP_ALIVE {
let h_ttl = humantime::format_duration(ttl);
tracing::warn!(path = %p.display(), ttl = %h_ttl, "ttl is too short, increasing to {}", humantime::format_duration(MIN_KEEP_ALIVE));
}
let ttl = cmp::max(ttl, MIN_KEEP_ALIVE);
Directory {
root: canonical_root,
p,
ttl,
owned_files: Arc::new(Mutex::new(HashSet::new())),
}
}
fn keep_alive(&self) {
let owned_files = self.owned_files.lock().clone();
for path in owned_files {
let file = match OpenOptions::new().write(true).open(&path) {
Ok(f) => f,
Err(err) => {
tracing::error!(path = %path.display(), error = %err, "FileStore::keep_alive failed opening owned file");
continue;
}
};
if let Err(err) = file.set_modified(SystemTime::now()) {
tracing::error!(path = %path.display(), error = %err, "FileStore::keep_alive failed set_modified on owned file");
continue;
}
tracing::trace!("FileStore keep_alive set {}", path.display());
}
}
fn delete_expired_files(&self) -> anyhow::Result<()> {
let deadline = SystemTime::now() - self.ttl;
let dirname = self.p.display().to_string();
for entry in fs::read_dir(&self.p).with_context(|| dirname.clone())? {
let entry = match entry {
Ok(p) => p,
Err(err) => {
tracing::warn!(dir = dirname, error = %err, "File store could read directory contents");
continue;
}
};
if !entry.file_type().map(|f| f.is_file()).unwrap_or(false) {
tracing::warn!(dir = dirname, entry = %entry.path().display(), "File store directory should only contain files");
continue;
}
let ctx = entry.path().display().to_string();
let metadata = match entry.metadata() {
Ok(m) => m,
Err(err) => {
tracing::warn!(path = %ctx, error = %err, "Failed fetching metadata");
continue;
}
};
let last_modified = match metadata.modified() {
Ok(lm) => lm,
Err(err) => {
tracing::warn!(path = %ctx, error = %err, "Failed reading mtime");
continue;
}
};
if last_modified < deadline {
tracing::info!(path = ctx, ?last_modified, "Expired");
if let Err(err) = fs::remove_file(entry.path()) {
tracing::warn!(path = %ctx, error = %err, "Failed removing");
}
}
}
Ok(())
}
fn delete_owned_files(&mut self) -> anyhow::Result<()> {
let mut errs = Vec::new();
for p in self.owned_files.lock().drain() {
if let Err(err) = fs::remove_file(&p) {
errs.push(format!("{}: {err}", p.display()));
}
}
if !errs.is_empty() {
anyhow::bail!(errs.join(", "));
}
Ok(())
}
}
impl fmt::Display for Directory {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.p.display())
}
}
#[async_trait]
impl Bucket for Directory {
async fn insert(
&self,
key: &Key,
value: bytes::Bytes,
_revision: u64, ) -> Result<StoreOutcome, StoreError> {
let safe_key = key.url_safe();
let full_path = self.p.join(safe_key.as_ref());
let str_path = full_path.display().to_string();
let temp_name = format!("{TEMP_FILE_PREFIX}{:016x}", rand::random::<u64>());
let temp_path = self.p.join(&temp_name);
fs::write(&temp_path, &value)
.with_context(|| format!("writing temp file {}", temp_path.display()))
.map_err(a_to_fs_err)?;
fs::rename(&temp_path, &full_path)
.with_context(|| format!("renaming {} to {}", temp_path.display(), str_path))
.map_err(a_to_fs_err)?;
self.owned_files.lock().insert(full_path.clone());
Ok(StoreOutcome::Created(0))
}
async fn get(&self, key: &Key) -> Result<Option<bytes::Bytes>, StoreError> {
let safe_key = key.url_safe();
let full_path = self.p.join(safe_key.as_ref());
if !full_path.exists() {
return Ok(None);
}
let str_path = full_path.display().to_string();
let data: bytes::Bytes = fs::read(&full_path)
.context(str_path)
.map_err(a_to_fs_err)?
.into();
Ok(Some(data))
}
async fn delete(&self, key: &Key) -> Result<(), StoreError> {
let safe_key = key.url_safe();
let full_path = self.p.join(safe_key.as_ref());
let str_path = full_path.display().to_string();
if !full_path.exists() {
return Err(StoreError::MissingKey(str_path));
}
self.owned_files.lock().remove(&full_path);
fs::remove_file(&full_path)
.context(str_path)
.map_err(a_to_fs_err)
}
async fn watch(
&self,
) -> Result<Pin<Box<dyn futures::Stream<Item = WatchEvent> + Send + 'life0>>, StoreError> {
let (tx, mut rx) = tokio::sync::mpsc::channel(128);
let mut watcher = RecommendedWatcher::new(
move |res: Result<Event, notify::Error>| {
if let Err(err) = tx.blocking_send(res) {
tracing::error!(error = %err, "Failed to send file watch event");
}
},
Config::default(),
)
.map_err(to_fs_err)?;
watcher
.watch(&self.p, RecursiveMode::NonRecursive)
.map_err(to_fs_err)?;
let dir = self.p.clone();
let root = self.root.clone();
Ok(Box::pin(async_stream::stream! {
let _watcher = watcher;
while let Some(event_result) = rx.recv().await {
let event = match event_result {
Ok(event) => event,
Err(err) => {
tracing::error!(error = %err, "Failed receiving file watch event");
continue;
}
};
for item_path in event.paths {
if item_path == dir {
tracing::warn!("Unexpected event on the directory itself");
continue;
}
let canonical_item_path = item_path.canonicalize().unwrap_or_else(|_| item_path.clone());
let key = match canonical_item_path.strip_prefix(&root) {
Ok(stripped) => Key::from_url_safe(&stripped.display().to_string()),
Err(err) => {
tracing::error!(
error = %err,
item_path = %canonical_item_path.display(),
root = %root.display(),
"Item in file store is not prefixed with file store root. Should be impossible. Ignoring invalid key.");
continue;
}
};
if item_path.file_name()
.map(|n| n.to_string_lossy().starts_with(TEMP_FILE_PREFIX))
.unwrap_or(false)
{
continue;
}
match event.kind {
EventKind::Create(event::CreateKind::File)
| EventKind::Modify(event::ModifyKind::Data(event::DataChange::Content))
| EventKind::Modify(event::ModifyKind::Name(event::RenameMode::To)) => {
let data: bytes::Bytes = match fs::read(&item_path) {
Ok(data) => data.into(),
Err(err) => {
tracing::warn!(error = %err, item = %item_path.display(), "Failed reading event item. Skipping.");
continue;
}
};
let item = KeyValue::new(key, data);
yield WatchEvent::Put(item);
}
EventKind::Remove(event::RemoveKind::File) => {
yield WatchEvent::Delete(key);
}
_ => {
continue;
}
}
}
}
}))
}
async fn entries(&self) -> Result<HashMap<Key, bytes::Bytes>, StoreError> {
let contents = fs::read_dir(&self.p)
.with_context(|| self.p.display().to_string())
.map_err(a_to_fs_err)?;
let mut out = HashMap::new();
for entry in contents {
let entry = entry.map_err(to_fs_err)?;
if !entry.path().is_file() {
tracing::warn!(
path = %entry.path().display(),
"Unexpected entry, directory should only contain files."
);
continue;
}
if entry
.file_name()
.to_string_lossy()
.starts_with(TEMP_FILE_PREFIX)
{
continue;
}
let canonical_entry_path = match entry.path().canonicalize() {
Ok(p) => p,
Err(err) => {
tracing::warn!(error = %err, path = %entry.path().display(), "Failed to canonicalize path. Using original path.");
entry.path()
}
};
let key = match canonical_entry_path.strip_prefix(&self.root) {
Ok(p) => Key::from_url_safe(&p.to_string_lossy()),
Err(err) => {
tracing::error!(
error = %err,
path = %canonical_entry_path.display(),
root = %self.root.display(),
"FileStore path not in root. Should be impossible. Skipping entry."
);
continue;
}
};
let data: bytes::Bytes = fs::read(entry.path())
.with_context(|| self.p.display().to_string())
.map_err(a_to_fs_err)?
.into();
out.insert(key, data);
}
Ok(out)
}
}
fn a_to_fs_err(err: anyhow::Error) -> StoreError {
StoreError::FilesystemError(format!("{err:#}"))
}
fn to_fs_err<E: std::error::Error>(err: E) -> StoreError {
StoreError::FilesystemError(err.to_string())
}
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use tokio_util::sync::CancellationToken;
use crate::storage::kv::{Bucket as _, FileStore, Key, Store as _};
#[tokio::test]
async fn test_entries_full_path() {
let t = tempfile::tempdir().unwrap();
let cancel_token = CancellationToken::new();
let m = FileStore::new(cancel_token.clone(), t.path());
let bucket = m.get_or_create_bucket("v1/tests", None).await.unwrap();
let _ = bucket
.insert(&Key::new("key1/multi/part".to_string()), "value1".into(), 0)
.await
.unwrap();
let _ = bucket
.insert(&Key::new("key2".to_string()), "value2".into(), 0)
.await
.unwrap();
let entries = bucket.entries().await.unwrap();
let keys: HashSet<Key> = entries.into_keys().collect();
cancel_token.cancel();
assert!(keys.contains(&Key::new("v1/tests/key1/multi/part".to_string())));
assert!(keys.contains(&Key::new("v1/tests/key2".to_string())));
}
}