small_bin/
watcher.rs

1use crate::{
2    config::AppConfig,
3    database::{Database, QueueItem},
4    *,
5};
6use anyhow::Result;
7use notify::{Config, Event, RecommendedWatcher, RecursiveMode, Watcher};
8use std::{path::Path, sync::Arc};
9use tokio::sync::mpsc;
10
11
12#[derive(Debug)]
13pub struct FileWatcher {
14    config: Arc<AppConfig>,
15    database: Arc<Database>,
16}
17
18
19impl FileWatcher {
20    pub fn new(config: Arc<AppConfig>, database: Arc<Database>) -> Self {
21        FileWatcher {
22            config,
23            database,
24        }
25    }
26
27
28    pub async fn start(self: Arc<Self>) -> Result<()> {
29        info!("Launching Small Filesystem Handler");
30        let config = self
31            .config
32            .select_config()
33            .expect("One of configs should always be selected!");
34        info!("Watching path: {}", config.watch_path);
35
36        let (tx, mut rx) = mpsc::unbounded_channel();
37        let mut watcher = RecommendedWatcher::new(
38            move |res: Result<Event, notify::Error>| {
39                if let Ok(event) = res {
40                    let _ = tx.send(event);
41                }
42            },
43            Config::default(),
44        )?;
45
46        watcher.watch(Path::new(&config.watch_path), RecursiveMode::NonRecursive)?;
47        info!("Filesystem non-recursive events watcher initialized");
48
49        // Keep watcher alive and process events
50        while let Some(event) = rx.recv().await {
51            if let Err(e) = self.handle_event(event).await {
52                error!("Error handling file event: {e:?}");
53            }
54        }
55
56        Ok(())
57    }
58
59
60    async fn handle_event(&self, event: Event) -> Result<()> {
61        let temp_pattern = regex::Regex::new(r".*-[a-zA-Z]{4,}$")?;
62        for path in event.paths {
63            let path_str = path.to_string_lossy().to_string();
64            debug!("Handling event: {:?} for path {path_str}", event.kind);
65            if !path.exists() {
66                debug!(
67                    "File doesn't exist: {path_str} after event {:?}. Skipped process_event",
68                    event.kind
69                );
70                continue;
71            }
72            if temp_pattern.is_match(&path_str) {
73                debug!("{path_str} matches temp file name! Skipping");
74                continue;
75            }
76            self.process_event(&path_str)?;
77        }
78
79        Ok(())
80    }
81
82
83    fn process_event(&self, file_path: &str) -> Result<()> {
84        debug!("Processing event for path: {file_path}");
85        let config = self
86            .config
87            .select_config()
88            .expect("One of configs should always be selected!");
89        let uuid_from_file =
90            uuid::Uuid::new_v3(&uuid::Uuid::NAMESPACE_OID, file_path.as_bytes()).to_string();
91        let remote_dest_file = format!("{}/{uuid_from_file}", config.remote_path);
92
93        // Add to queue
94        let queue_item = QueueItem {
95            local_file: file_path.to_string(),
96            remote_file: remote_dest_file,
97            uuid: uuid_from_file,
98        };
99        self.database.add_to_queue(&queue_item)?;
100        debug!("Added file to queue: {file_path}");
101        Ok(())
102    }
103}