bubbles 3.1.1

Bubble integration server for powder diffraction
use crossbeam_channel::unbounded;
use crossbeam_channel::{Receiver, Sender};
#[cfg(target_os = "linux")]
use notify::event::{AccessKind, AccessMode};
use notify::{Event, EventKind};
use notify::{RecommendedWatcher, RecursiveMode, Watcher};
use std::path::PathBuf;
use std::sync::Arc;

//noinspection RsLiveness
fn empty_loop(start: Receiver<String>, stop: Receiver<Arc<String>>) {
    loop {
        select! {
            recv(start) -> msg => {
                if let Ok(_) = msg {}
            }
            recv(stop) -> msg => {
                if let Ok(_) = msg {}
            }
        }
    }
}

pub fn run(start: Receiver<String>, stop: Receiver<Arc<String>>, event: Sender<PathBuf>) {
    let (ch_send_watch, ch_recv_watch) = unbounded();
    let mut watcher: RecommendedWatcher = match Watcher::new_immediate(move |res| match res {
        Ok(event) => {
            let _ = ch_send_watch.send(event);
        }
        Err(err) => error!("Watcher error: {:?}", err),
    }) {
        Ok(watcher) => watcher,
        Err(e) => {
            error!("Failed to initialize an FS watcher: {}", e);
            return empty_loop(start, stop);
        }
    };
    loop {
        select! {
            recv(ch_recv_watch) -> msg => {
                if let Ok(e) = msg {
                    for path in e.process() {
                        let _ = event.send(path);
                    }
                }
            }
            recv(start) -> msg => {
                if let Ok(path) = msg {
                    match watcher.watch(&path, RecursiveMode::Recursive) {
                        Ok(_) => info!("Watcher is set recursively to {}", path),
                        Err(e) => error!("Could not set watcher for {}: {}", path, e),
                    }
                }
            }
            recv(stop) -> msg => {
                if let Ok(path) = msg {
                    match watcher.unwatch(path.as_ref()) {
                        Ok(_) => info!("Watcher is unset to {}", path),
                        Err(e) => error!("Could not unset watcher for {}: {}", path, e),
                    }
                }
            }
        }
    }
}

trait Process {
    fn process(self) -> Vec<PathBuf>;
}

impl Process for Event {
    #[cfg(target_os = "linux")]
    fn process(self) -> Vec<PathBuf> {
        match self.kind {
            EventKind::Access(kind) => match kind {
                AccessKind::Close(mode) => match mode {
                    AccessMode::Write => return self.paths.files(),
                    _ => {}
                },
                _ => {}
            },
            _ => {}
        }
        vec![]
    }

    #[cfg(not(target_os = "linux"))]
    fn process(self) -> Vec<PathBuf> {
        match self.kind {
            EventKind::Create(_) | EventKind::Modify(_) => return self.paths.files(),
            _ => {}
        }
        vec![]
    }
}

trait RetainFiles {
    fn files(self) -> Self;
}

impl RetainFiles for Vec<PathBuf> {
    fn files(mut self) -> Self {
        self.retain(|path| path.is_file());
        return self;
    }
}