1use crate::{
2 config::AppConfig,
3 database::{Database, QueueItem},
4 *,
5};
6use anyhow::Result;
7use notify::{
8 Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher,
9 event::{MetadataKind, ModifyKind, RenameMode},
10};
11use std::{path::Path, sync::Arc};
12use tokio::sync::mpsc;
13
14
15#[derive(Debug)]
16pub struct FileWatcher {
17 config: Arc<AppConfig>,
18 database: Arc<Database>,
19}
20
21
22impl FileWatcher {
23 pub fn new(config: Arc<AppConfig>, database: Arc<Database>) -> Self {
24 FileWatcher {
25 config,
26 database,
27 }
28 }
29
30
31 pub async fn start(self: Arc<Self>) -> Result<()> {
32 info!("Launching Small Filesystem Handler");
33 let config = self
34 .config
35 .select_config()
36 .expect("One of configs should always be selected!");
37 info!("Watching path: {}", config.watch_path);
38
39 let (tx, mut rx) = mpsc::unbounded_channel();
40 let mut watcher = RecommendedWatcher::new(
41 move |res: Result<Event, notify::Error>| {
42 if let Ok(event) = res {
43 let _ = tx.send(event);
44 }
45 },
46 Config::default(),
47 )?;
48
49 watcher.watch(Path::new(&config.watch_path), RecursiveMode::NonRecursive)?;
50 info!("Filesystem non-recursive events watcher initialized");
51
52 while let Some(event) = rx.recv().await {
54 if let Err(e) = self.handle_event(event).await {
55 error!("Error handling file event: {e:?}");
56 }
57 }
58
59 Ok(())
60 }
61
62
63 async fn handle_event(&self, event: Event) -> Result<()> {
64 for path in event.paths {
65 let path_str = path.to_string_lossy().to_string();
66 debug!("Handling event: {:?} for path {path_str}", event.kind);
67 if !path.exists() {
68 debug!(
69 "File doesn't exist: {path_str} after event {:?}. Skipped process_event",
70 event.kind
71 );
72 continue;
73 }
74 if TEMP_PATTERN.is_match(&path_str) {
75 debug!("{path_str} matches temp file name! Skipping");
76 continue;
77 }
78 if event.kind == EventKind::Modify(ModifyKind::Metadata(MetadataKind::Any)) {
80 debug!("Skipping event kind: {:?}", event.kind);
81 continue;
82 }
83
84 self.process_event(&path_str)?;
85 }
86
87 Ok(())
88 }
89
90
91 fn process_event(&self, file_path: &str) -> Result<()> {
92 debug!("Processing event for path: {file_path}");
93 let config = self
94 .config
95 .select_config()
96 .expect("One of configs should always be selected!");
97 let uuid_from_file =
98 uuid::Uuid::new_v3(&uuid::Uuid::NAMESPACE_OID, file_path.as_bytes()).to_string();
99 let remote_dest_file = format!("{}/{uuid_from_file}", config.remote_path);
100
101 let queue_item = QueueItem {
103 local_file: file_path.to_string(),
104 remote_file: remote_dest_file,
105 uuid: uuid_from_file,
106 };
107 self.database.add_to_queue(&queue_item)?;
108 debug!("Added file to queue: {file_path}");
109 Ok(())
110 }
111}