fs-librarian 0.3.3

Librarian runs pre-configured commands against a group of files that match a set of filters
use super::{FsOp, Notification, Notifier};
use notify::{Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
use std::{
    collections::HashSet,
    env::consts::OS,
    path::PathBuf,
    sync::mpsc::{channel, Sender},
    thread,
};
use tokio_util::sync::CancellationToken;

#[derive(Debug, thiserror::Error)]
pub enum Error {
    #[error("An error was thrown by the filesystem notification system")]
    Notify(#[from] notify::Error),
    #[error("An error was thrown while trying to interract with the notification system")]
    Send(#[from] std::sync::mpsc::SendError<bool>),
}

pub(crate) struct NotifyNotifier {
    stop_cancellation_token: CancellationToken,
}

impl NotifyNotifier {
    pub(crate) fn new() -> Self {
        NotifyNotifier {
            stop_cancellation_token: CancellationToken::new(),
        }
    }
    fn convert_op(event_kind: EventKind) -> FsOp {
        let mut fs_op = FsOp::all();
        if !event_kind.is_remove() {
            fs_op.remove(FsOp::REMOVE);
        }

        fs_op
    }
}

impl Notifier for NotifyNotifier {
    fn start_watching(
        &mut self,
        paths: &HashSet<PathBuf>,
        notification_sender: Sender<Notification>,
    ) -> Result<(), super::Error> {
        let stop_cancellation_token = self.stop_cancellation_token.clone();
        let local_paths = paths.clone();
        thread::spawn(move || {
            let (watcher_sender, watcher_receiver) = channel();
            let mut watcher: RecommendedWatcher =
                match RecommendedWatcher::new(watcher_sender, Config::default()) {
                    Ok(w) => w,
                    Err(e) => panic!("NotifyNotifier returned an error while initializing: {}", e),
                };

            for cur_path in local_paths {
                println!("Watching path {:?}", cur_path);
                if let Err(e) = watcher.watch(&cur_path, RecursiveMode::Recursive) {
                    panic!("NotifyNotifier returned an error while attempting to watch a directory: {}", e);
                }
            }
            loop {
                match watcher_receiver.recv() {
                    Ok(Ok(Event {
                        kind,
                        paths,
                        attrs: _,
                    })) => {
                        for path in paths {
                            let notification = Notification {
                                path,
                                op: Self::convert_op(kind),
                            };
                            if let Err(e) = notification_sender.send(notification) {
                                eprint!("Unable to notify upwards a filesystem event: {:?}", e);
                            }
                        }
                    }
                    Ok(e) => eprintln!("NotifyNotifier returned a broken event: {:?}", e),
                    Err(e) => {
                        panic!("NotifyNotifier returned an error: {}", e);
                    }
                }

                if stop_cancellation_token.is_cancelled() {
                    println!("Cancelling watching using NotifyNotifier");
                    break;
                }
            }
        });

        Ok(())
    }

    fn stop_watching(&mut self) {
        self.stop_cancellation_token.cancel();
    }

    fn is_supported(&self) -> bool {
        if OS == "windows" {
            return false;
        }

        true
    }
}