use std::io::Result;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use crate::frontmatter;
use crate::fs::{AsyncFileSystem, BoxFuture};
use crate::link_parser;
use super::callback_registry::{CallbackRegistry, EventCallback, SubscriptionId};
use super::events::FileSystemEvent;
pub struct EventEmittingFs<FS: AsyncFileSystem> {
inner: FS,
registry: Arc<CallbackRegistry>,
enabled: AtomicBool,
}
impl<FS: AsyncFileSystem> EventEmittingFs<FS> {
pub fn new(inner: FS) -> Self {
Self {
inner,
registry: Arc::new(CallbackRegistry::new()),
enabled: AtomicBool::new(true),
}
}
pub fn with_registry(inner: FS, registry: Arc<CallbackRegistry>) -> Self {
Self {
inner,
registry,
enabled: AtomicBool::new(true),
}
}
pub fn is_enabled(&self) -> bool {
self.enabled.load(Ordering::SeqCst)
}
pub fn set_enabled(&self, enabled: bool) {
self.enabled.store(enabled, Ordering::SeqCst);
}
pub fn on_event(&self, callback: EventCallback) -> SubscriptionId {
self.registry.subscribe(callback)
}
pub fn off_event(&self, id: SubscriptionId) -> bool {
self.registry.unsubscribe(id)
}
pub fn registry(&self) -> &Arc<CallbackRegistry> {
&self.registry
}
pub fn inner(&self) -> &FS {
&self.inner
}
fn emit(&self, event: FileSystemEvent) {
if self.is_enabled() {
self.registry.emit(&event);
}
}
fn extract_frontmatter(&self, content: &str) -> Option<serde_json::Value> {
frontmatter::parse_or_empty(content)
.ok()
.and_then(|parsed| serde_json::to_value(&parsed.frontmatter).ok())
}
fn get_parent_from_content(&self, file_path: &Path, content: &str) -> Option<PathBuf> {
frontmatter::parse_or_empty(content)
.ok()
.and_then(|parsed| {
parsed
.frontmatter
.get("part_of")
.and_then(|v| v.as_str())
.map(|raw| {
let parsed = link_parser::parse_link(raw);
PathBuf::from(link_parser::to_canonical(&parsed, file_path))
})
})
}
}
impl<FS: AsyncFileSystem + Clone> Clone for EventEmittingFs<FS> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
registry: Arc::clone(&self.registry),
enabled: AtomicBool::new(self.enabled.load(Ordering::SeqCst)),
}
}
}
#[cfg(not(target_arch = "wasm32"))]
impl<FS: AsyncFileSystem + Send + Sync> AsyncFileSystem for EventEmittingFs<FS> {
fn read_to_string<'a>(&'a self, path: &'a Path) -> BoxFuture<'a, Result<String>> {
self.inner.read_to_string(path)
}
fn write_file<'a>(&'a self, path: &'a Path, content: &'a str) -> BoxFuture<'a, Result<()>> {
Box::pin(async move {
let old_frontmatter = if self.is_enabled() {
self.inner
.read_to_string(path)
.await
.ok()
.and_then(|old_content| self.extract_frontmatter(&old_content))
} else {
None
};
let existed =
old_frontmatter.is_some() || (!self.is_enabled() && self.inner.exists(path).await);
let result = self.inner.write_file(path, content).await;
if result.is_ok() {
let new_frontmatter = self.extract_frontmatter(content);
let parent_path = self.get_parent_from_content(path, content);
if existed {
if let Some(new_fm) = new_frontmatter {
let changed = match &old_frontmatter {
Some(old_fm) => old_fm != &new_fm,
None => true, };
if changed {
self.emit(FileSystemEvent::metadata_changed(
path.to_path_buf(),
new_fm,
));
}
}
} else {
self.emit(FileSystemEvent::file_created_with_metadata(
path.to_path_buf(),
new_frontmatter,
parent_path,
));
}
}
result
})
}
fn create_new<'a>(&'a self, path: &'a Path, content: &'a str) -> BoxFuture<'a, Result<()>> {
Box::pin(async move {
let result = self.inner.create_new(path, content).await;
if result.is_ok() {
let frontmatter = self.extract_frontmatter(content);
let parent_path = self.get_parent_from_content(path, content);
self.emit(FileSystemEvent::file_created_with_metadata(
path.to_path_buf(),
frontmatter,
parent_path,
));
}
result
})
}
fn delete_file<'a>(&'a self, path: &'a Path) -> BoxFuture<'a, Result<()>> {
Box::pin(async move {
let parent_path = if self.is_enabled() {
self.inner
.read_to_string(path)
.await
.ok()
.and_then(|content| self.get_parent_from_content(path, &content))
} else {
None
};
let result = self.inner.delete_file(path).await;
if result.is_ok() {
self.emit(FileSystemEvent::file_deleted_with_parent(
path.to_path_buf(),
parent_path,
));
}
result
})
}
fn list_md_files<'a>(&'a self, dir: &'a Path) -> BoxFuture<'a, Result<Vec<PathBuf>>> {
self.inner.list_md_files(dir)
}
fn exists<'a>(&'a self, path: &'a Path) -> BoxFuture<'a, bool> {
self.inner.exists(path)
}
fn create_dir_all<'a>(&'a self, path: &'a Path) -> BoxFuture<'a, Result<()>> {
self.inner.create_dir_all(path)
}
fn is_dir<'a>(&'a self, path: &'a Path) -> BoxFuture<'a, bool> {
self.inner.is_dir(path)
}
fn move_file<'a>(&'a self, from: &'a Path, to: &'a Path) -> BoxFuture<'a, Result<()>> {
Box::pin(async move {
let result = self.inner.move_file(from, to).await;
if result.is_ok() {
let from_parent = from.parent();
let to_parent = to.parent();
if from_parent == to_parent {
self.emit(FileSystemEvent::file_renamed(
from.to_path_buf(),
to.to_path_buf(),
));
} else {
self.emit(FileSystemEvent::file_moved(
to.to_path_buf(),
from_parent.map(PathBuf::from),
to_parent.map(PathBuf::from),
));
}
}
result
})
}
fn read_binary<'a>(&'a self, path: &'a Path) -> BoxFuture<'a, Result<Vec<u8>>> {
self.inner.read_binary(path)
}
fn write_binary<'a>(&'a self, path: &'a Path, content: &'a [u8]) -> BoxFuture<'a, Result<()>> {
self.inner.write_binary(path, content)
}
fn list_files<'a>(&'a self, dir: &'a Path) -> BoxFuture<'a, Result<Vec<PathBuf>>> {
self.inner.list_files(dir)
}
fn get_modified_time<'a>(&'a self, path: &'a Path) -> BoxFuture<'a, Option<i64>> {
self.inner.get_modified_time(path)
}
fn mark_sync_write_start(&self, path: &Path) {
self.inner.mark_sync_write_start(path);
}
fn mark_sync_write_end(&self, path: &Path) {
self.inner.mark_sync_write_end(path);
}
}
#[cfg(target_arch = "wasm32")]
impl<FS: AsyncFileSystem> AsyncFileSystem for EventEmittingFs<FS> {
fn read_to_string<'a>(&'a self, path: &'a Path) -> BoxFuture<'a, Result<String>> {
self.inner.read_to_string(path)
}
fn write_file<'a>(&'a self, path: &'a Path, content: &'a str) -> BoxFuture<'a, Result<()>> {
Box::pin(async move {
let old_frontmatter = if self.is_enabled() {
self.inner
.read_to_string(path)
.await
.ok()
.and_then(|old_content| self.extract_frontmatter(&old_content))
} else {
None
};
let existed =
old_frontmatter.is_some() || (!self.is_enabled() && self.inner.exists(path).await);
let result = self.inner.write_file(path, content).await;
if result.is_ok() {
let new_frontmatter = self.extract_frontmatter(content);
let parent_path = self.get_parent_from_content(path, content);
if existed {
if let Some(new_fm) = new_frontmatter {
let changed = match &old_frontmatter {
Some(old_fm) => old_fm != &new_fm,
None => true, };
if changed {
self.emit(FileSystemEvent::metadata_changed(
path.to_path_buf(),
new_fm,
));
}
}
} else {
self.emit(FileSystemEvent::file_created_with_metadata(
path.to_path_buf(),
new_frontmatter,
parent_path,
));
}
}
result
})
}
fn create_new<'a>(&'a self, path: &'a Path, content: &'a str) -> BoxFuture<'a, Result<()>> {
Box::pin(async move {
let result = self.inner.create_new(path, content).await;
if result.is_ok() {
let frontmatter = self.extract_frontmatter(content);
let parent_path = self.get_parent_from_content(path, content);
self.emit(FileSystemEvent::file_created_with_metadata(
path.to_path_buf(),
frontmatter,
parent_path,
));
}
result
})
}
fn delete_file<'a>(&'a self, path: &'a Path) -> BoxFuture<'a, Result<()>> {
Box::pin(async move {
let parent_path = if self.is_enabled() {
self.inner
.read_to_string(path)
.await
.ok()
.and_then(|content| self.get_parent_from_content(path, &content))
} else {
None
};
let result = self.inner.delete_file(path).await;
if result.is_ok() {
self.emit(FileSystemEvent::file_deleted_with_parent(
path.to_path_buf(),
parent_path,
));
}
result
})
}
fn list_md_files<'a>(&'a self, dir: &'a Path) -> BoxFuture<'a, Result<Vec<PathBuf>>> {
self.inner.list_md_files(dir)
}
fn exists<'a>(&'a self, path: &'a Path) -> BoxFuture<'a, bool> {
self.inner.exists(path)
}
fn create_dir_all<'a>(&'a self, path: &'a Path) -> BoxFuture<'a, Result<()>> {
self.inner.create_dir_all(path)
}
fn is_dir<'a>(&'a self, path: &'a Path) -> BoxFuture<'a, bool> {
self.inner.is_dir(path)
}
fn move_file<'a>(&'a self, from: &'a Path, to: &'a Path) -> BoxFuture<'a, Result<()>> {
Box::pin(async move {
let result = self.inner.move_file(from, to).await;
if result.is_ok() {
let from_parent = from.parent();
let to_parent = to.parent();
if from_parent == to_parent {
self.emit(FileSystemEvent::file_renamed(
from.to_path_buf(),
to.to_path_buf(),
));
} else {
self.emit(FileSystemEvent::file_moved(
to.to_path_buf(),
from_parent.map(PathBuf::from),
to_parent.map(PathBuf::from),
));
}
}
result
})
}
fn read_binary<'a>(&'a self, path: &'a Path) -> BoxFuture<'a, Result<Vec<u8>>> {
self.inner.read_binary(path)
}
fn write_binary<'a>(&'a self, path: &'a Path, content: &'a [u8]) -> BoxFuture<'a, Result<()>> {
self.inner.write_binary(path, content)
}
fn list_files<'a>(&'a self, dir: &'a Path) -> BoxFuture<'a, Result<Vec<PathBuf>>> {
self.inner.list_files(dir)
}
fn get_modified_time<'a>(&'a self, path: &'a Path) -> BoxFuture<'a, Option<i64>> {
self.inner.get_modified_time(path)
}
fn mark_sync_write_start(&self, path: &Path) {
self.inner.mark_sync_write_start(path);
}
fn mark_sync_write_end(&self, path: &Path) {
self.inner.mark_sync_write_end(path);
}
}
impl<FS: AsyncFileSystem> std::fmt::Debug for EventEmittingFs<FS> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EventEmittingFs")
.field("enabled", &self.is_enabled())
.field("registry", &self.registry)
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::fs::{InMemoryFileSystem, SyncToAsyncFs};
use std::io::ErrorKind;
use std::sync::atomic::AtomicUsize;
fn create_test_event_fs() -> EventEmittingFs<SyncToAsyncFs<InMemoryFileSystem>> {
let inner = SyncToAsyncFs::new(InMemoryFileSystem::new());
EventEmittingFs::new(inner)
}
#[derive(Clone, Default)]
struct MarkerRecordingFs {
starts: Arc<AtomicUsize>,
ends: Arc<AtomicUsize>,
}
impl AsyncFileSystem for MarkerRecordingFs {
fn read_to_string<'a>(&'a self, _path: &'a Path) -> BoxFuture<'a, Result<String>> {
Box::pin(async move { Ok(String::new()) })
}
fn write_file<'a>(
&'a self,
_path: &'a Path,
_content: &'a str,
) -> BoxFuture<'a, Result<()>> {
Box::pin(async move { Ok(()) })
}
fn create_new<'a>(
&'a self,
_path: &'a Path,
_content: &'a str,
) -> BoxFuture<'a, Result<()>> {
Box::pin(async move { Ok(()) })
}
fn delete_file<'a>(&'a self, _path: &'a Path) -> BoxFuture<'a, Result<()>> {
Box::pin(async move { Ok(()) })
}
fn list_md_files<'a>(&'a self, _dir: &'a Path) -> BoxFuture<'a, Result<Vec<PathBuf>>> {
Box::pin(async move { Ok(vec![]) })
}
fn exists<'a>(&'a self, _path: &'a Path) -> BoxFuture<'a, bool> {
Box::pin(async move { false })
}
fn create_dir_all<'a>(&'a self, _path: &'a Path) -> BoxFuture<'a, Result<()>> {
Box::pin(async move { Ok(()) })
}
fn is_dir<'a>(&'a self, _path: &'a Path) -> BoxFuture<'a, bool> {
Box::pin(async move { false })
}
fn move_file<'a>(&'a self, _from: &'a Path, _to: &'a Path) -> BoxFuture<'a, Result<()>> {
Box::pin(async move { Err(std::io::Error::from(ErrorKind::NotFound)) })
}
fn mark_sync_write_start(&self, _path: &Path) {
self.starts.fetch_add(1, Ordering::SeqCst);
}
fn mark_sync_write_end(&self, _path: &Path) {
self.ends.fetch_add(1, Ordering::SeqCst);
}
}
#[test]
fn test_write_emits_file_created() {
let fs = create_test_event_fs();
let created_count = Arc::new(AtomicUsize::new(0));
let counter = Arc::clone(&created_count);
fs.on_event(Arc::new(move |event| {
if matches!(event, FileSystemEvent::FileCreated { .. }) {
counter.fetch_add(1, Ordering::SeqCst);
}
}));
futures_lite::future::block_on(async {
fs.write_file(Path::new("test.md"), "---\ntitle: Test\n---\nBody")
.await
.unwrap();
});
assert_eq!(created_count.load(Ordering::SeqCst), 1);
}
#[test]
fn test_write_existing_emits_metadata_changed_only_when_frontmatter_changes() {
let fs = create_test_event_fs();
let changed_count = Arc::new(AtomicUsize::new(0));
futures_lite::future::block_on(async {
fs.write_file(Path::new("test.md"), "---\ntitle: First\n---\nBody")
.await
.unwrap();
});
let counter = Arc::clone(&changed_count);
fs.on_event(Arc::new(move |event| {
if matches!(event, FileSystemEvent::MetadataChanged { .. }) {
counter.fetch_add(1, Ordering::SeqCst);
}
}));
futures_lite::future::block_on(async {
fs.write_file(Path::new("test.md"), "---\ntitle: Updated\n---\nBody")
.await
.unwrap();
});
assert_eq!(changed_count.load(Ordering::SeqCst), 1);
futures_lite::future::block_on(async {
fs.write_file(
Path::new("test.md"),
"---\ntitle: Updated\n---\nBody changed!",
)
.await
.unwrap();
});
assert_eq!(changed_count.load(Ordering::SeqCst), 1);
futures_lite::future::block_on(async {
fs.write_file(
Path::new("test.md"),
"---\ntitle: Final Title\n---\nBody changed!",
)
.await
.unwrap();
});
assert_eq!(changed_count.load(Ordering::SeqCst), 2);
}
#[test]
fn test_delete_emits_file_deleted() {
let fs = create_test_event_fs();
let deleted_count = Arc::new(AtomicUsize::new(0));
futures_lite::future::block_on(async {
fs.write_file(Path::new("test.md"), "content")
.await
.unwrap();
});
let counter = Arc::clone(&deleted_count);
fs.on_event(Arc::new(move |event| {
if matches!(event, FileSystemEvent::FileDeleted { .. }) {
counter.fetch_add(1, Ordering::SeqCst);
}
}));
futures_lite::future::block_on(async {
fs.delete_file(Path::new("test.md")).await.unwrap();
});
assert_eq!(deleted_count.load(Ordering::SeqCst), 1);
}
#[test]
fn test_disabled_skips_events() {
let fs = create_test_event_fs();
fs.set_enabled(false);
let event_count = Arc::new(AtomicUsize::new(0));
let counter = Arc::clone(&event_count);
fs.on_event(Arc::new(move |_| {
counter.fetch_add(1, Ordering::SeqCst);
}));
futures_lite::future::block_on(async {
fs.write_file(Path::new("test.md"), "content")
.await
.unwrap();
});
assert_eq!(event_count.load(Ordering::SeqCst), 0);
}
#[test]
fn test_unsubscribe() {
let fs = create_test_event_fs();
let event_count = Arc::new(AtomicUsize::new(0));
let counter = Arc::clone(&event_count);
let id = fs.on_event(Arc::new(move |_| {
counter.fetch_add(1, Ordering::SeqCst);
}));
futures_lite::future::block_on(async {
fs.write_file(Path::new("test1.md"), "content")
.await
.unwrap();
});
assert_eq!(event_count.load(Ordering::SeqCst), 1);
assert!(fs.off_event(id));
futures_lite::future::block_on(async {
fs.write_file(Path::new("test2.md"), "content")
.await
.unwrap();
});
assert_eq!(event_count.load(Ordering::SeqCst), 1);
}
#[test]
fn test_move_same_parent_emits_renamed() {
let fs = create_test_event_fs();
let renamed_count = Arc::new(AtomicUsize::new(0));
futures_lite::future::block_on(async {
fs.write_file(Path::new("dir/old.md"), "content")
.await
.unwrap();
});
let counter = Arc::clone(&renamed_count);
fs.on_event(Arc::new(move |event| {
if matches!(event, FileSystemEvent::FileRenamed { .. }) {
counter.fetch_add(1, Ordering::SeqCst);
}
}));
futures_lite::future::block_on(async {
fs.move_file(Path::new("dir/old.md"), Path::new("dir/new.md"))
.await
.unwrap();
});
assert_eq!(renamed_count.load(Ordering::SeqCst), 1);
}
#[test]
fn test_move_different_parent_emits_moved() {
let fs = create_test_event_fs();
let moved_count = Arc::new(AtomicUsize::new(0));
futures_lite::future::block_on(async {
fs.create_dir_all(Path::new("dir1")).await.unwrap();
fs.create_dir_all(Path::new("dir2")).await.unwrap();
fs.write_file(Path::new("dir1/file.md"), "content")
.await
.unwrap();
});
let counter = Arc::clone(&moved_count);
fs.on_event(Arc::new(move |event| {
if matches!(event, FileSystemEvent::FileMoved { .. }) {
counter.fetch_add(1, Ordering::SeqCst);
}
}));
futures_lite::future::block_on(async {
fs.move_file(Path::new("dir1/file.md"), Path::new("dir2/file.md"))
.await
.unwrap();
});
assert_eq!(moved_count.load(Ordering::SeqCst), 1);
}
#[test]
fn test_sync_write_markers_forwarded_to_inner_fs() {
let inner = MarkerRecordingFs::default();
let starts = Arc::clone(&inner.starts);
let ends = Arc::clone(&inner.ends);
let fs = EventEmittingFs::new(inner);
fs.mark_sync_write_start(Path::new("a.md"));
fs.mark_sync_write_end(Path::new("a.md"));
assert_eq!(starts.load(Ordering::SeqCst), 1);
assert_eq!(ends.load(Ordering::SeqCst), 1);
}
}