use std::{
cell::Cell,
cmp::PartialEq,
collections::{HashMap, HashSet},
fs,
ops::IndexMut,
path::{Path, PathBuf},
str,
sync::{
atomic::{AtomicBool, Ordering},
Arc, RwLock,
},
thread,
time::Duration,
};
use distill_core::utils::{self, canonicalize_path};
use distill_schema::data::{self, dirty_file_info, rename_file_event, source_file_info, FileType};
use event_listener::Event;
use futures::{
channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender},
stream::StreamExt,
FutureExt,
};
use lmdb::Cursor;
use log::{debug, info};
use tokio::{
sync::{mpsc, Mutex},
time,
};
use crate::{
capnp_db::{
CapnpCursor, DBTransaction, Environment, MessageReader, RoTransaction, RwTransaction,
},
error::{Error, Result},
watcher::{self, FileEvent, FileMetadata},
};
#[derive(Clone)]
struct FileTrackerTables {
source_files: lmdb::Database,
dirty_files: lmdb::Database,
rename_file_events: lmdb::Database,
}
#[derive(Copy, Clone, Debug)]
pub enum FileTrackerEvent {
Start,
Update,
}
pub struct FileTracker {
db: Arc<Environment>,
tables: FileTrackerTables,
listener_rx: Mutex<Cell<UnboundedReceiver<UnboundedSender<FileTrackerEvent>>>>,
listener_tx: UnboundedSender<UnboundedSender<FileTrackerEvent>>,
is_running: AtomicBool,
stopping_event: event_listener::Event,
watch_dirs: RwLock<Vec<PathBuf>>,
}
#[derive(Clone)]
pub struct FileState {
pub path: PathBuf,
pub state: data::FileState,
pub last_modified: u64,
pub length: u64,
pub ty: data::FileType,
}
impl std::fmt::Debug for FileState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
use capnp::traits::ToU16;
f.debug_struct("FileState")
.field("path", &self.path)
.field("state", &self.state)
.field("last_modified", &self.last_modified)
.field("length", &self.length)
.field("ty", &self.ty.to_u16())
.finish()
}
}
impl PartialEq for FileState {
fn eq(&self, other: &FileState) -> bool {
self.path == other.path
&& self.state == other.state
&& self.last_modified == other.last_modified
&& self.length == other.length
}
}
#[derive(Clone, Debug)]
pub struct RenameFileEvent {
pub src: PathBuf,
pub dst: PathBuf,
}
struct ScanContext {
path: PathBuf,
files: HashMap<PathBuf, FileMetadata>,
}
pub fn db_file_type(t: fs::FileType) -> FileType {
if t.is_dir() {
FileType::Directory
} else if t.is_symlink() {
FileType::Symlink
} else {
FileType::File
}
}
struct ListenersList {
listeners: Vec<UnboundedSender<FileTrackerEvent>>,
}
impl ListenersList {
fn new() -> Self {
Self {
listeners: Vec::new(),
}
}
fn register(&mut self, new_listener: Option<UnboundedSender<FileTrackerEvent>>) {
if let Some(new_listener) = new_listener {
self.listeners.push(new_listener);
}
}
fn send_event(&mut self, event: FileTrackerEvent) {
self.listeners.retain(|listener| {
match listener.unbounded_send(event) {
Ok(()) => {
debug!("Sent to listener");
true
}
Err(_) => {
debug!("Listener dropped");
false
}
}
})
}
}
fn add_rename_event(
tables: &FileTrackerTables,
txn: &mut RwTransaction<'_>,
src: &[u8],
dst: &[u8],
) -> Result<()> {
let mut last_seq: u64 = 0;
let last_element = txn
.open_ro_cursor(tables.rename_file_events)
.expect("Failed to open RO cursor for rename_file_events")
.capnp_iter_start()
.last();
if let Some((key, _)) = last_element {
last_seq = u64::from_le_bytes(utils::make_array(key));
}
let mut value_builder = capnp::message::Builder::new_default();
{
let mut value = value_builder.init_root::<rename_file_event::Builder<'_>>();
value.set_src(src);
value.set_dst(dst);
}
last_seq += 1;
txn.put(
tables.rename_file_events,
&last_seq.to_le_bytes(),
&value_builder,
)?;
Ok(())
}
fn build_dirty_file_info(
state: data::FileState,
source_info: source_file_info::Reader<'_>,
) -> capnp::message::Builder<capnp::message::HeapAllocator> {
let mut value_builder = capnp::message::Builder::new_default();
{
let mut value = value_builder.init_root::<dirty_file_info::Builder<'_>>();
value.set_state(state);
value
.set_source_info(source_info)
.expect("failed to set source info");
}
value_builder
}
fn build_source_info(
metadata: &watcher::FileMetadata,
) -> capnp::message::Builder<capnp::message::HeapAllocator> {
let mut value_builder = capnp::message::Builder::new_default();
{
let mut value = value_builder.init_root::<source_file_info::Builder<'_>>();
value.set_last_modified(metadata.last_modified);
value.set_length(metadata.length);
value.set_type(db_file_type(metadata.file_type));
}
value_builder
}
fn update_deleted_dirty_entry<K>(
txn: &mut RwTransaction<'_>,
tables: &FileTrackerTables,
key: &K,
) -> Result<()>
where
K: AsRef<[u8]>,
{
let dirty_value = {
txn.get::<source_file_info::Owned, K>(tables.source_files, key)?
.map(|v| {
let info = v.get().expect("failed to get source_file_info");
build_dirty_file_info(data::FileState::Deleted, info)
})
};
if dirty_value.is_some() {
txn.put(tables.dirty_files, key, &dirty_value.unwrap())?;
}
Ok(())
}
mod events {
use std::path::Path;
use super::*;
fn handle_update(
txn: &mut RwTransaction<'_>,
tables: &FileTrackerTables,
path: &Path,
metadata: &watcher::FileMetadata,
scan_stack: &mut Vec<ScanContext>,
) -> Result<()> {
let path_str = path.to_string_lossy();
let key = path_str.as_bytes();
let mut changed = true;
{
let maybe_msg: Option<MessageReader<'_, source_file_info::Owned>> =
txn.get(tables.source_files, &key)?;
if let Some(msg) = maybe_msg {
let info = msg.get()?;
if info.get_length() == metadata.length
&& info.get_last_modified() == metadata.last_modified
&& info.get_type()? == db_file_type(metadata.file_type)
{
changed = false;
} else {
debug!("CHANGED {} metadata {:?}", path_str, metadata);
}
}
}
if !scan_stack.is_empty() {
let head_idx = scan_stack.len() - 1;
let scan_ctx = scan_stack.index_mut(head_idx);
scan_ctx.files.insert(path.to_path_buf(), metadata.clone());
}
if changed {
let value = build_source_info(&metadata);
let dirty_value = build_dirty_file_info(
data::FileState::Exists,
value.get_root_as_reader::<source_file_info::Reader<'_>>()?,
);
txn.put(tables.source_files, &key, &value)?;
txn.put(tables.dirty_files, &key, &dirty_value)?;
}
Ok(())
}
pub(super) fn handle_file_event(
txn: &mut RwTransaction<'_>,
tables: &FileTrackerTables,
evt: watcher::FileEvent,
scan_stack: &mut Vec<ScanContext>,
watch_dirs: &RwLock<Vec<PathBuf>>,
) -> Result<Option<FileTrackerEvent>> {
match evt {
FileEvent::Updated(path, metadata) => {
handle_update(txn, tables, &path, &metadata, scan_stack)?;
}
FileEvent::Renamed(src, dst, metadata) => {
if !scan_stack.is_empty() {
let head_idx = scan_stack.len() - 1;
let scan_ctx = scan_stack.index_mut(head_idx);
scan_ctx.files.insert(dst.clone(), metadata.clone());
scan_ctx.files.remove(&src);
}
let src_str = src.to_string_lossy();
let src_key = src_str.as_bytes();
let dst_str = dst.to_string_lossy();
let dst_key = dst_str.as_bytes();
debug!("rename {} to {} metadata {:?}", src_str, dst_str, metadata);
let value = build_source_info(&metadata);
txn.delete(tables.source_files, &src_key)?;
txn.put(tables.source_files, &dst_key, &value)?;
let dirty_value_new = build_dirty_file_info(
data::FileState::Exists,
value.get_root_as_reader::<source_file_info::Reader<'_>>()?,
);
let dirty_value_old = build_dirty_file_info(
data::FileState::Deleted,
value.get_root_as_reader::<source_file_info::Reader<'_>>()?,
);
txn.put(tables.dirty_files, &src_key, &dirty_value_old)?;
txn.put(tables.dirty_files, &dst_key, &dirty_value_new)?;
add_rename_event(tables, txn, &src_key, &dst_key)?;
}
FileEvent::Removed(path) => {
if !scan_stack.is_empty() {
let head_idx = scan_stack.len() - 1;
let scan_ctx = scan_stack.index_mut(head_idx);
scan_ctx.files.remove(&path);
}
let path_str = path.to_string_lossy();
let key = path_str.as_bytes();
debug!("removed {}", path_str);
update_deleted_dirty_entry(txn, &tables, &key)?;
txn.delete(tables.source_files, &key)?;
}
FileEvent::FileError(err) => {
debug!("file event error: {}", err);
return Err(err);
}
FileEvent::ScanStart(path) => {
debug!("scan start: {}", path.to_string_lossy());
scan_stack.push(ScanContext {
path,
files: HashMap::new(),
});
}
FileEvent::ScanEnd(path, watched_dirs) => {
let scan_ctx = scan_stack.pop().unwrap();
let mut db_file_set = HashSet::new();
{
let path_str = path.to_string_lossy();
let key = path_str.as_bytes();
let path_string = scan_ctx.path.to_string_lossy().into_owned();
let cursor = txn
.open_ro_cursor(tables.source_files)
.expect("Failed to open RO cursor for source_files table");
for (key, _) in cursor.capnp_iter_from(&key) {
let key = str::from_utf8(key).expect("Encoded key was invalid utf8");
if !key.starts_with(&path_string) {
break;
}
db_file_set.insert(PathBuf::from(key));
}
}
let scan_ctx_set: HashSet<PathBuf> = scan_ctx.files.keys().cloned().collect();
let to_remove = db_file_set.difference(&scan_ctx_set);
for p in to_remove {
let p_str = p.to_string_lossy();
let p_key = p_str.as_bytes();
update_deleted_dirty_entry(txn, &tables, &p_key)?;
txn.delete(tables.source_files, &p_key)?;
}
info!(
"Scanned and compared {} + {}, deleted {}",
scan_ctx_set.len(),
db_file_set.len(),
db_file_set.difference(&scan_ctx_set).count()
);
if scan_stack.is_empty() {
let mut to_delete = Vec::new();
{
let mut cursor = txn
.open_ro_cursor(tables.source_files)
.expect("Failed to open RO cursor for source_files table");
let dirs_as_strings: Vec<String> = watched_dirs
.into_iter()
.map(|f| f.to_string_lossy().into_owned())
.collect();
for iter_result in cursor.iter_start() {
let (key_bytes, _) =
iter_result.expect("Error while iterating source file metadata");
let key =
str::from_utf8(key_bytes).expect("Encoded key was invalid utf8");
if !dirs_as_strings.iter().any(|dir| key.starts_with(dir)) {
to_delete.push(key);
}
}
}
for key in to_delete {
txn.delete(tables.source_files, &key)?;
update_deleted_dirty_entry(txn, &tables, &key)?;
}
}
debug!("scan end: {}", path.to_string_lossy());
return Ok(Some(FileTrackerEvent::Start));
}
FileEvent::Watch(path) => {
let mut watch_dirs = watch_dirs.write().expect("watch_dirs lock poisoned");
watch_dirs.push(path);
}
FileEvent::Unwatch(path) => {
let mut watch_dirs = watch_dirs.write().expect("watch_dirs lock poisoned");
watch_dirs.retain(|p| p != &path);
}
}
Ok(None)
}
}
impl FileTracker {
pub fn new<'a, I, T>(db: Arc<Environment>, to_watch: I) -> FileTracker
where
I: IntoIterator<Item = &'a str, IntoIter = T>,
T: Iterator<Item = &'a str>,
{
let watch_dirs: Vec<PathBuf> = to_watch
.into_iter()
.map(|s| {
let path = PathBuf::from(s);
let path = if path.is_relative() {
std::env::current_dir()
.expect("failed to get current dir")
.join(path)
} else {
path
};
canonicalize_path(&path)
})
.collect();
let source_files = db
.create_db(Some("source_files"), lmdb::DatabaseFlags::default())
.expect("db: Failed to create source_files table");
let dirty_files = db
.create_db(Some("dirty_files"), lmdb::DatabaseFlags::default())
.expect("db: Failed to create dirty_files table");
let rename_file_events = db
.create_db(Some("rename_file_events"), lmdb::DatabaseFlags::INTEGER_KEY)
.expect("db: Failed to create rename_file_events table");
let (listener_tx, listener_rx) = unbounded();
FileTracker {
is_running: AtomicBool::new(false),
stopping_event: Event::new(),
tables: FileTrackerTables {
source_files,
dirty_files,
rename_file_events,
},
db,
listener_rx: Mutex::new(Cell::new(listener_rx)),
listener_tx,
watch_dirs: RwLock::new(watch_dirs),
}
}
pub fn make_relative_path(&self, absolute_path: &Path) -> Option<PathBuf> {
for dir in self.get_watch_dirs() {
let canonicalized_dir = canonicalize_path(&dir);
if absolute_path.starts_with(&canonicalized_dir) {
let relative_path = absolute_path
.strip_prefix(canonicalized_dir)
.expect("error stripping prefix")
.to_path_buf();
let relative_path = canonicalize_path(&relative_path)
.to_string_lossy()
.replace("\\", "/");
return Some(PathBuf::from(relative_path));
}
}
None
}
pub fn get_watch_dirs(&self) -> Vec<PathBuf> {
self.watch_dirs
.read()
.expect("watch_dirs lock poisoned")
.iter()
.cloned()
.collect()
}
pub async fn get_rw_txn(&self) -> RwTransaction<'_> {
self.db.rw_txn().await.expect("db: Failed to open rw txn")
}
pub fn read_rename_events<'a, V: DBTransaction<'a, T>, T: lmdb::Transaction + 'a>(
&self,
iter_txn: &'a V,
) -> Vec<(u64, RenameFileEvent)> {
iter_txn
.open_ro_cursor(self.tables.rename_file_events)
.expect("db: Failed to open ro cursor for rename_file_events table")
.capnp_iter_start()
.filter_map(|(key, val)| {
let val = val.ok()?;
let evt = val.into_typed::<rename_file_event::Owned>();
let evt = evt.get().ok()?;
let seq_num = u64::from_le_bytes(utils::make_array(key));
let src_raw = evt.get_src().ok()?;
let src = PathBuf::from(str::from_utf8(src_raw).ok()?);
let dst_raw = evt.get_dst().ok()?;
let dst = PathBuf::from(str::from_utf8(dst_raw).ok()?);
Some((seq_num, RenameFileEvent { src, dst }))
})
.collect()
}
pub fn clear_rename_events(&self, txn: &mut RwTransaction<'_>) {
txn.clear_db(self.tables.rename_file_events)
.expect("db: Failed to clear rename_file_events table");
}
pub async fn add_dirty_file(&self, txn: &mut RwTransaction<'_>, path: &Path) -> Result<()> {
let metadata = match tokio::fs::metadata(path).await {
Err(ref e) if e.kind() == std::io::ErrorKind::NotFound => None,
Err(e) => return Err(Error::IO(e)),
Ok(metadata) => Some(watcher::file_metadata(&metadata)),
};
let path_str = path.to_string_lossy();
let key = path_str.as_bytes();
if let Some(metadata) = metadata {
let source_info = build_source_info(&metadata);
let dirty_file_info = build_dirty_file_info(
data::FileState::Exists,
source_info.get_root_as_reader::<source_file_info::Reader<'_>>()?,
);
txn.put(self.tables.source_files, &key, &source_info)?;
txn.put(self.tables.dirty_files, &key, &dirty_file_info)?;
} else {
update_deleted_dirty_entry(txn, &self.tables, &key)?;
}
Ok(())
}
pub fn read_dirty_files<'a, V: DBTransaction<'a, T>, T: lmdb::Transaction + 'a>(
&self,
iter_txn: &'a V,
) -> Vec<FileState> {
iter_txn
.open_ro_cursor(self.tables.dirty_files)
.expect("db: Failed to open ro cursor for dirty_files table")
.capnp_iter_start()
.filter_map(|(key, val)| {
let key = str::from_utf8(key).expect("utf8: Failed to parse file path");
let val = val.expect("capnp: Failed to get value in iterator");
let info = val.get_root::<dirty_file_info::Reader<'_>>().ok()?;
let source_info = info
.get_source_info()
.expect("capnp: Failed to get source info");
Some(FileState {
path: PathBuf::from(key),
state: info.get_state().ok()?,
last_modified: source_info.get_last_modified(),
length: source_info.get_length(),
ty: source_info
.get_type()
.expect("Failed to read type in source file info"),
})
})
.collect()
}
pub fn read_all_files(&self, iter_txn: &RoTransaction<'_>) -> Vec<FileState> {
iter_txn
.open_ro_cursor(self.tables.source_files)
.expect("db: Failed to open ro cursor for source_files table")
.capnp_iter_start()
.filter_map(|(key, val)| {
let key = str::from_utf8(key).expect("utf8: Failed to parse file path");
let val = val.expect("capnp: Failed to get value in iterator");
let info = val.get_root::<source_file_info::Reader<'_>>().ok()?;
Some(FileState {
path: PathBuf::from(key),
state: data::FileState::Exists,
last_modified: info.get_last_modified(),
length: info.get_length(),
ty: info
.get_type()
.expect("Failed to read type in source file info"),
})
})
.collect()
}
pub fn delete_dirty_file_state<'a>(&self, txn: &'a mut RwTransaction<'_>, path: &Path) -> bool {
let key_str = path.to_string_lossy();
let key = key_str.as_bytes();
txn.delete(self.tables.dirty_files, &key)
.expect("db: Failed to delete entry from dirty_files table")
}
#[cfg(not(target_os = "macos"))] #[cfg(test)]
pub fn get_dirty_file_state<'a, V: DBTransaction<'a, T>, T: lmdb::Transaction + 'a>(
&self,
txn: &'a V,
path: &Path,
) -> Option<FileState> {
let key_str = path.to_string_lossy();
let key = key_str.as_bytes();
txn.get::<dirty_file_info::Owned, &[u8]>(self.tables.dirty_files, &key)
.expect("db: Failed to get entry from dirty_files table")
.map(|value| {
let value = value.get().expect("capnp: Failed to get dirty file info");
let info = value
.get_source_info()
.expect("capnp: Failed to get source info");
FileState {
path: path.to_path_buf(),
state: data::FileState::Exists,
last_modified: info.get_last_modified(),
length: info.get_length(),
ty: info.get_type().expect("Failed to read type in source info"),
}
})
}
pub fn get_file_state<'a, V: DBTransaction<'a, T>, T: lmdb::Transaction + 'a>(
&self,
txn: &'a V,
path: &Path,
) -> Option<FileState> {
let key_str = path.to_string_lossy();
let key = key_str.as_bytes();
txn.get::<source_file_info::Owned, &[u8]>(self.tables.source_files, &key)
.expect("db: Failed to get entry from source_files table")
.map(|value| {
let info = value.get().expect("capnp: Failed to get source file info");
FileState {
path: path.to_path_buf(),
state: data::FileState::Exists,
last_modified: info.get_last_modified(),
length: info.get_length(),
ty: info
.get_type()
.expect("Failed to read type in source file info"),
}
})
}
pub fn register_listener(&self, sender: UnboundedSender<FileTrackerEvent>) {
self.listener_tx
.unbounded_send(sender)
.expect("Failed registering listener")
}
pub async fn stop(&self) {
if self.is_running() {
self.stopping_event.notify(std::usize::MAX);
self.listener_rx.lock().await;
assert_eq!(false, self.is_running.load(Ordering::Acquire));
}
}
pub fn is_running(&self) -> bool {
self.is_running.load(Ordering::Acquire)
}
pub async fn run(&self) {
if self
.is_running
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
.is_err()
{
return;
}
let (watcher_tx, mut watcher_rx) = mpsc::unbounded_channel();
let to_watch: Vec<PathBuf> = self.get_watch_dirs();
let mut watcher =
watcher::DirWatcher::from_path_iter(to_watch.iter().map(|p| Path::new(p)), watcher_tx)
.expect("watcher: Failed to watch specified path");
let stop_handle = watcher.stop_handle();
thread::spawn(move || watcher.run());
let mut listeners = ListenersList::new();
let mut scan_stack = Vec::new();
let mut stopping = self.stopping_event.listen();
let mut listener_rx_guard = self.listener_rx.lock().await;
let listener_rx = listener_rx_guard.get_mut();
let mut dirty = false;
#[allow(unused_mut)]
loop {
let mut delay = time::sleep(Duration::from_millis(40));
tokio::select! {
new_listener = listener_rx.next() => listeners.register(new_listener),
_ = delay, if dirty => {
listeners.send_event(FileTrackerEvent::Update);
dirty = false;
},
mut maybe_file_event = watcher_rx.recv() => {
if maybe_file_event.is_none() {
debug!("FileTracker: stopping due to exhausted watcher");
break;
}
let mut txn = self.get_rw_txn().await;
while let Some(file_event) = maybe_file_event {
match events::handle_file_event(&mut txn, &self.tables, file_event, &mut scan_stack, &self.watch_dirs) {
Ok(Some(evt)) => listeners.send_event(evt),
Ok(None) => {},
Err(err) => panic!("Error while handling file event: {}", err),
}
maybe_file_event = watcher_rx.recv().now_or_never().flatten();
}
if txn.dirty {
txn.commit().expect("Failed to commit");
dirty = true;
}
}
_ = &mut stopping => {
debug!("FileTracker: stopping due to stop() notification");
break;
}
}
}
listeners.send_event(FileTrackerEvent::Update);
drop(stop_handle);
self.is_running.store(false, Ordering::Release);
}
}
#[cfg(not(target_os = "macos"))] #[cfg(test)]
pub mod tests {
use std::{
fs,
future::Future,
path::{Path, PathBuf},
sync::Arc,
time::Duration,
};
use tokio::time::timeout;
use super::*;
use crate::{
capnp_db::Environment,
file_tracker::{FileTracker, FileTrackerEvent},
};
pub async fn with_tracker<F, T>(f: F)
where
T: Future<Output = ()>,
F: FnOnce(Arc<FileTracker>, UnboundedReceiver<FileTrackerEvent>, PathBuf) -> T,
{
let _ = crate::init_logging();
let db_dir = tempfile::tempdir().unwrap();
let asset_dir = tempfile::tempdir().unwrap();
let _ = fs::create_dir(db_dir.path());
let asset_paths = vec![asset_dir.path().to_str().unwrap()];
let db = Arc::new(
Environment::with_map_size(db_dir.path(), 1 << 21).unwrap_or_else(|_| {
panic!(
"failed to create db environment {}",
db_dir.path().to_string_lossy()
)
}),
);
let tracker = Arc::new(FileTracker::new(db, asset_paths));
let (tx, mut rx) = unbounded();
tracker.register_listener(tx);
let tracker_clone = tracker.clone();
let handle = tokio::task::spawn(async move { tracker_clone.run().await });
expect_event(&mut rx).await;
f(tracker.clone(), rx, asset_dir.into_path()).await;
tracker.stop().await;
handle.await.unwrap();
}
async fn expect_no_event(rx: &mut UnboundedReceiver<FileTrackerEvent>) {
match timeout(Duration::from_millis(1000), rx.next()).await {
Err(_) => return,
Ok(evt) => panic!("Received unexpected event {:?}", evt),
}
}
async fn expect_event(rx: &mut UnboundedReceiver<FileTrackerEvent>) -> FileTrackerEvent {
match timeout(Duration::from_millis(10000), rx.next()).await {
Err(_) => panic!("Timed out waiting for file event"),
Ok(evt) => return evt.unwrap(),
}
}
async fn expect_no_file_state(t: &FileTracker, asset_dir: &Path, name: &str) {
let txn = t.get_rw_txn().await;
let canonical_path = canonicalize_path(&asset_dir.join(name));
assert!(
t.get_file_state(&txn, &canonical_path).is_none(),
"expected no file state for file {}",
name
);
}
async fn expect_file_state(t: &FileTracker, asset_dir: &Path, name: &str) {
let txn = t.get_rw_txn().await;
let canonical_path = canonicalize_path(&asset_dir.join(name));
t.get_file_state(&txn, &canonical_path)
.unwrap_or_else(|| panic!("expected file state for file {}", name));
}
pub async fn add_test_dir(asset_dir: &Path, name: &str) -> PathBuf {
let path = PathBuf::from(asset_dir).join(name);
tokio::fs::create_dir(&path).await.expect("create dir");
path
}
pub async fn add_test_file(asset_dir: &Path, name: &str) {
tokio::fs::copy(
PathBuf::from("tests/file_tracker/").join(name),
asset_dir.join(name),
)
.await
.expect("copy test file");
}
#[cfg(target_family = "windows")]
pub fn add_symlink_file<P: AsRef<Path>, Q: AsRef<Path>, R: AsRef<Path>>(
asset_dir: &P,
name: &Q,
target: &R,
) {
match std::os::windows::fs::symlink_file(
target,
PathBuf::from(asset_dir.as_ref()).join(name),
) {
Ok(_) => {}
Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => {}
err => panic!("failed to create symlink file: {:?}", err),
}
}
#[cfg(target_family = "unix")]
pub fn add_symlink_file<P: AsRef<Path>, Q: AsRef<Path>, R: AsRef<Path>>(
asset_dir: &P,
name: &Q,
target: &R,
) {
match std::os::unix::fs::symlink(target, PathBuf::from(asset_dir.as_ref()).join(name)) {
Ok(_) => {}
Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => {}
err => panic!("failed to create symlink file: {:?}", err),
}
}
#[cfg(target_family = "windows")]
pub fn add_symlink_dir<P: AsRef<Path>, Q: AsRef<Path>, R: AsRef<Path>>(
asset_dir: &P,
name: &Q,
target: &R,
) {
match std::os::windows::fs::symlink_dir(
target,
PathBuf::from(asset_dir.as_ref()).join(name),
) {
Ok(_) => {}
Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => {}
err => panic!("failed to create symlink file: {:?}", err),
}
}
#[cfg(target_family = "unix")]
pub fn add_symlink_dir<P: AsRef<Path>, Q: AsRef<Path>, R: AsRef<Path>>(
asset_dir: &P,
name: &Q,
target: &R,
) {
match std::os::unix::fs::symlink(target, PathBuf::from(asset_dir.as_ref()).join(name)) {
Ok(_) => {}
Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => {}
err => panic!("failed to create symlink file: {:?}", err),
}
}
async fn expect_dirty_file_state(t: &FileTracker, asset_dir: &Path, name: &str) {
let txn = t.get_rw_txn().await;
let path = canonicalize_path(&PathBuf::from(asset_dir));
let canonical_path = path.join(name);
t.get_dirty_file_state(&txn, &canonical_path)
.unwrap_or_else(|| panic!("expected dirty file state for file {}", name));
}
async fn clear_dirty_file_state(t: &FileTracker) {
let mut txn = t.get_rw_txn().await;
for f in t.read_dirty_files(&txn) {
t.delete_dirty_file_state(&mut txn, &f.path);
}
}
#[tokio::test]
async fn test_create_file() {
with_tracker(|t, mut rx, asset_dir| async move {
add_test_file(&asset_dir, "test.txt").await;
expect_event(&mut rx).await;
expect_no_event(&mut rx).await;
expect_file_state(&t, &asset_dir, "test.txt").await;
expect_dirty_file_state(&t, &asset_dir, "test.txt").await;
})
.await;
}
#[tokio::test]
async fn test_modify_file() {
with_tracker(|t, mut rx, asset_dir| async move {
add_test_file(&asset_dir, "test.txt").await;
expect_event(&mut rx).await;
expect_file_state(&t, &asset_dir, "test.txt").await;
expect_dirty_file_state(&t, &asset_dir, "test.txt").await;
clear_dirty_file_state(&t).await;
tokio::fs::File::create(asset_dir.join("test.txt"))
.await
.expect("truncate test file");
expect_event(&mut rx).await;
expect_no_event(&mut rx).await;
expect_file_state(&t, &asset_dir, "test.txt").await;
expect_dirty_file_state(&t, &asset_dir, "test.txt").await;
})
.await;
}
#[tokio::test]
async fn test_delete_file() {
with_tracker(|t, mut rx, asset_dir| async move {
add_test_file(&asset_dir, "test.txt").await;
expect_event(&mut rx).await;
expect_file_state(&t, &asset_dir, "test.txt").await;
expect_dirty_file_state(&t, &asset_dir, "test.txt").await;
clear_dirty_file_state(&t).await;
tokio::fs::remove_file(asset_dir.join("test.txt"))
.await
.expect("test file could not be deleted");
expect_event(&mut rx).await;
expect_no_event(&mut rx).await;
expect_no_file_state(&t, &asset_dir, "test.txt").await;
expect_dirty_file_state(&t, &asset_dir, "test.txt").await;
})
.await;
}
#[tokio::test]
async fn test_create_dir() {
with_tracker(|t, mut rx, asset_dir| async move {
add_test_dir(&asset_dir, "testdir").await;
expect_event(&mut rx).await;
expect_no_event(&mut rx).await;
expect_file_state(&t, &asset_dir, "testdir").await;
expect_dirty_file_state(&t, &asset_dir, "testdir").await;
})
.await;
}
#[tokio::test]
async fn test_create_file_in_dir() {
with_tracker(|t, mut rx, asset_dir| async move {
let dir = add_test_dir(&asset_dir, "testdir").await;
expect_event(&mut rx).await;
expect_no_event(&mut rx).await;
expect_file_state(&t, &asset_dir, "testdir").await;
expect_dirty_file_state(&t, &asset_dir, "testdir").await;
add_test_file(&dir, "test.txt").await;
expect_event(&mut rx).await;
expect_no_event(&mut rx).await;
expect_file_state(&t, &dir, "test.txt").await;
expect_dirty_file_state(&t, &dir, "test.txt").await;
})
.await;
}
#[tokio::test]
async fn test_create_emacs_lockfile() {
with_tracker(|t, mut rx, asset_dir| async move {
add_symlink_file(
&asset_dir,
&"emacs.symlink".to_string(),
&"emacs@lock.file:buffer".to_string(),
);
expect_event(&mut rx).await;
expect_no_event(&mut rx).await;
expect_file_state(&t, &asset_dir, "emacs.symlink").await;
expect_dirty_file_state(&t, &asset_dir, "emacs.symlink").await;
assert!(t.get_watch_dirs() == vec![asset_dir])
})
.await;
}
#[tokio::test]
async fn test_create_symlink_dir() {
with_tracker(|t, mut rx, asset_dir| {
async move {
let watch_dir = tempfile::tempdir().unwrap();
add_symlink_dir(&asset_dir, &"dir_symlink".to_string(), &watch_dir);
expect_event(&mut rx).await;
expect_event(&mut rx).await;
expect_no_event(&mut rx).await;
expect_file_state(&t, &asset_dir, "dir_symlink").await;
expect_dirty_file_state(&t, &asset_dir, "dir_symlink").await;
assert!(t.get_watch_dirs() == vec![asset_dir, watch_dir.path().to_path_buf()]);
add_test_file(watch_dir.path(), "test.txt").await;
expect_event(&mut rx).await;
expect_no_event(&mut rx).await;
expect_file_state(&t, &watch_dir.path(), "test.txt").await;
expect_dirty_file_state(&t, &watch_dir.path(), "test.txt").await;
}
})
.await;
}
#[tokio::test]
async fn test_delete_symlink_dir() {
with_tracker(|t, mut rx, asset_dir| async move {
let watch_dir = tempfile::tempdir().unwrap();
add_symlink_dir(&asset_dir, &"dir_symlink".to_string(), &watch_dir);
expect_event(&mut rx).await;
expect_event(&mut rx).await;
expect_no_event(&mut rx).await;
expect_file_state(&t, &asset_dir, "dir_symlink").await;
expect_dirty_file_state(&t, &asset_dir, "dir_symlink").await;
assert!(t.get_watch_dirs() == vec![asset_dir.clone(), watch_dir.path().to_path_buf()]);
#[cfg(target_family = "windows")]
tokio::fs::remove_dir(asset_dir.join("dir_symlink"))
.await
.expect("test file could not be deleted");
#[cfg(target_family = "unix")]
tokio::fs::remove_file(asset_dir.join("dir_symlink"))
.await
.expect("test file could not be deleted");
expect_event(&mut rx).await;
expect_event(&mut rx).await;
expect_no_event(&mut rx).await;
expect_no_file_state(&t, &asset_dir, "dir_symlink").await;
expect_dirty_file_state(&t, &asset_dir, "dir_symlink").await;
assert!(t.get_watch_dirs() == vec![asset_dir]);
})
.await;
}
}