1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
use crate::services::ingest::ingest_path;
use notify::{
Config as NotifyConfig, Event, EventKind, RecommendedWatcher, RecursiveMode,
Watcher as NotifyWatcher,
};
use rusqlite::Connection;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use tracing::{debug, error, info};
use uuid::Uuid;
#[allow(dead_code)]
pub struct Watcher {
conn: Arc<Mutex<Connection>>,
watcher: Option<RecommendedWatcher>,
}
#[allow(dead_code)]
impl Watcher {
pub fn new(conn: Arc<Mutex<Connection>>) -> Self {
Self {
conn,
watcher: None,
}
}
pub fn start(&mut self, path: PathBuf) {
info!("Starting directory watcher on: {:?}", path);
let conn = self.conn.clone();
// Create a watcher that runs on a separate thread (managed by notify)
let watcher_result = RecommendedWatcher::new(
move |res: notify::Result<Event>| {
match res {
Ok(event) => {
debug!("Watcher event: {:?}", event);
match event.kind {
EventKind::Create(_) | EventKind::Modify(_) => {
for path in event.paths {
if path.is_file() {
info!("New file detected: {:?}", path);
// Locking here inside the callback is safe as long as we don't deadlock.
// ingest_path might take time, blocking notifier thread, but that's ok for now.
let conn_clone = conn.clone();
let path_str = path.to_string_lossy().to_string();
tokio::spawn(async move {
let session_id = Uuid::new_v4().to_string();
if let Err(e) = ingest_path(
conn_clone,
&path_str,
&session_id,
None,
)
.await
{
error!("Failed to ingest file {}: {}", path_str, e);
}
});
}
}
}
_ => {}
}
}
Err(e) => error!("Watch error: {:?}", e),
}
},
NotifyConfig::default(),
);
match watcher_result {
Ok(mut watcher) => {
if let Err(e) = watcher.watch(&path, RecursiveMode::Recursive) {
error!("Failed to start watcher: {:?}", e);
} else {
info!("Watcher started successfully");
}
self.watcher = Some(watcher);
}
Err(e) => {
error!("Failed to create watcher: {:?}", e);
}
}
}
pub fn stop(&mut self) {
self.watcher = None;
}
}