extern crate notify;
use std::{collections::HashMap, path::PathBuf};
use notify::{Event, RecommendedWatcher, RecursiveMode, Watcher};
use ractor::{Actor, ActorId, ActorProcessingErr, ActorRef, RpcReplyPort};
#[cfg(test)]
mod tests;
pub struct FileWatcher;
#[derive(Clone, Copy, Eq, PartialEq, Hash)]
pub enum SubscriptionResult {
Ok,
Duplicate,
NotFound,
}
pub enum FileWatcherMessage {
Event(Event),
FwError(notify::Error),
Subscribe(
ActorId,
Box<dyn FileWatcherSubscriber>,
RpcReplyPort<SubscriptionResult>,
),
Unsubscribe(ActorId, RpcReplyPort<SubscriptionResult>),
}
pub trait FileWatcherSubscriber: Send + 'static {
fn event_received(&self, ev: Event);
}
#[derive(Default)]
pub struct FileWatcherConfig {
pub files: Vec<PathBuf>,
pub directories: Vec<PathBuf>,
}
pub struct FileWatcherState {
watcher: Option<RecommendedWatcher>,
subscriptions: HashMap<ActorId, Box<dyn FileWatcherSubscriber>>,
}
#[cfg_attr(feature = "async-trait", async_trait::async_trait)]
impl Actor for FileWatcher {
type Msg = FileWatcherMessage;
type State = FileWatcherState;
type Arguments = FileWatcherConfig;
async fn pre_start(
&self,
myself: ActorRef<Self::Msg>,
arguments: Self::Arguments,
) -> Result<Self::State, ActorProcessingErr> {
let mut watcher = notify::recommended_watcher(move |res| match res {
Ok(event) => {
let _ = myself.cast(Self::Msg::Event(event));
}
Err(e) => {
let _ = myself.cast(Self::Msg::FwError(e));
}
})?;
for file in arguments.files {
watcher.watch(&file, RecursiveMode::NonRecursive)?
}
for dir in arguments.directories {
watcher.watch(&dir, RecursiveMode::Recursive)?
}
Ok(FileWatcherState {
watcher: Some(watcher),
subscriptions: HashMap::new(),
})
}
async fn post_stop(
&self,
_: ActorRef<Self::Msg>,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
if let Some(w) = state.watcher.take() {
drop(w);
}
Ok(())
}
async fn handle(
&self,
_myself: ActorRef<Self::Msg>,
message: Self::Msg,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match message {
FileWatcherMessage::Event(watcher_event) => {
for sub in state.subscriptions.values() {
sub.event_received(watcher_event.clone());
}
}
FileWatcherMessage::FwError(e) => {
tracing::error!("Filewatcher error: {:?}", e);
return Err(e.into());
}
FileWatcherMessage::Subscribe(who, f, reply) => {
if state.subscriptions.insert(who, f).is_none() {
let _ = reply.send(SubscriptionResult::Ok);
} else {
let _ = reply.send(SubscriptionResult::Duplicate);
}
}
FileWatcherMessage::Unsubscribe(who, reply) => {
if state.subscriptions.remove(&who).is_some() {
let _ = reply.send(SubscriptionResult::Ok);
} else {
let _ = reply.send(SubscriptionResult::NotFound);
}
}
}
Ok(())
}
}