small_bin/
watcher.rs

1use crate::{
2    config::AppConfig,
3    database::{Database, QueueItem},
4    *,
5};
6use anyhow::Result;
7use notify::{
8    Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher,
9    event::{MetadataKind, ModifyKind, RenameMode},
10};
11use std::{path::Path, sync::Arc};
12use tokio::sync::mpsc;
13
14
15#[derive(Debug)]
16pub struct FileWatcher {
17    config: Arc<AppConfig>,
18    database: Arc<Database>,
19}
20
21
22impl FileWatcher {
23    pub fn new(config: Arc<AppConfig>, database: Arc<Database>) -> Self {
24        FileWatcher {
25            config,
26            database,
27        }
28    }
29
30
31    pub async fn start(self: Arc<Self>) -> Result<()> {
32        info!("Launching Small Filesystem Handler");
33        let config = self
34            .config
35            .select_config()
36            .expect("One of configs should always be selected!");
37        info!("Watching path: {}", config.watch_path);
38
39        let (tx, mut rx) = mpsc::unbounded_channel();
40        let mut watcher = RecommendedWatcher::new(
41            move |res: Result<Event, notify::Error>| {
42                if let Ok(event) = res {
43                    let _ = tx.send(event);
44                }
45            },
46            Config::default(),
47        )?;
48
49        watcher.watch(Path::new(&config.watch_path), RecursiveMode::NonRecursive)?;
50        info!("Filesystem non-recursive events watcher initialized");
51
52        // Keep watcher alive and process events
53        while let Some(event) = rx.recv().await {
54            if let Err(e) = self.handle_event(event).await {
55                error!("Error handling file event: {e:?}");
56            }
57        }
58
59        Ok(())
60    }
61
62
63    async fn handle_event(&self, event: Event) -> Result<()> {
64        for path in event.paths {
65            let path_str = path.to_string_lossy().to_string();
66            debug!("Handling event: {:?} for path {path_str}", event.kind);
67            if !path.exists() {
68                debug!(
69                    "File doesn't exist: {path_str} after event {:?}. Skipped process_event",
70                    event.kind
71                );
72                continue;
73            }
74            if TEMP_PATTERN.is_match(&path_str) {
75                debug!("{path_str} matches temp file name! Skipping");
76                continue;
77            }
78            // NOTE: do not trigger metadata change events
79            if event.kind == EventKind::Modify(ModifyKind::Metadata(MetadataKind::Any)) {
80                debug!("Skipping event kind: {:?}", event.kind);
81                continue;
82            }
83
84            self.process_event(&path_str)?;
85        }
86
87        Ok(())
88    }
89
90
91    fn process_event(&self, file_path: &str) -> Result<()> {
92        debug!("Processing event for path: {file_path}");
93        let config = self
94            .config
95            .select_config()
96            .expect("One of configs should always be selected!");
97        let uuid_from_file =
98            uuid::Uuid::new_v3(&uuid::Uuid::NAMESPACE_OID, file_path.as_bytes()).to_string();
99        let remote_dest_file = format!("{}/{uuid_from_file}", config.remote_path);
100
101        // Add to queue
102        let queue_item = QueueItem {
103            local_file: file_path.to_string(),
104            remote_file: remote_dest_file,
105            uuid: uuid_from_file,
106        };
107        self.database.add_to_queue(&queue_item)?;
108        debug!("Added file to queue: {file_path}");
109        Ok(())
110    }
111}