1use std::{path::PathBuf, sync::Arc, time::Duration};
7
8use futures::FutureExt;
9use log::{debug, error, info, trace, warn};
10use mecomp_storage::db::schemas::song::{Song, SongChangeSet, SongMetadata};
11#[cfg(target_os = "macos")]
12use notify::FsEventWatcher;
13#[cfg(target_os = "linux")]
14use notify::INotifyWatcher;
15#[cfg(target_os = "windows")]
16use notify::ReadDirectoryChangesWatcher;
17use notify::{
18 EventKind, RecursiveMode,
19 event::{CreateKind, MetadataKind, ModifyKind, RemoveKind, RenameMode},
20};
21use notify_debouncer_full::RecommendedCache;
22use notify_debouncer_full::{DebouncedEvent, Debouncer, new_debouncer};
23use one_or_many::OneOrMany;
24use surrealdb::{Surreal, engine::local::Db};
25use tokio::sync::Mutex;
26
27use crate::termination::InterruptReceiver;
28
29#[cfg(target_os = "linux")]
30type WatcherType = INotifyWatcher;
31#[cfg(target_os = "macos")]
32type WatcherType = FsEventWatcher;
33#[cfg(target_os = "windows")]
34type WatcherType = ReadDirectoryChangesWatcher;
35
36const VALID_AUDIO_EXTENSIONS: [&str; 4] = ["mp3", "wav", "ogg", "flac"];
37
38pub const MAX_DEBOUNCE_TIME: Duration = Duration::from_millis(500);
39
40#[allow(clippy::missing_inline_in_public_items)]
60pub fn init_music_library_watcher(
61 db: Arc<Surreal<Db>>,
62 library_paths: &[PathBuf],
63 artist_name_separator: OneOrMany<String>,
64 protected_artist_names: OneOrMany<String>,
65 genre_separator: Option<String>,
66 mut interrupt: InterruptReceiver,
67) -> anyhow::Result<MusicLibEventHandlerGuard> {
68 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
69 let (stop_tx, stop_rx) = tokio::sync::oneshot::channel();
71
72 tokio::task::spawn(
74 async move {
75 let handler = MusicLibEventHandler::new(
76 db,
77 artist_name_separator,
78 protected_artist_names,
79 genre_separator,
80 );
81
82 let mut stop = Box::pin(stop_rx);
83
84 let lock = Arc::new(Mutex::new(()));
85
86 loop {
87 tokio::select! {
88 _ = &mut stop => {
89 info!("stop signal received, stopping watcher");
90 break;
91 }
92 _ = interrupt.wait() => {
93 info!("interrupt signal received, stopping watcher");
94 break;
95 }
96 Some(result) = rx.recv() => {
97 match result {
98 Ok(events) => {
99 for event in events {
100 if let Err(e) = handler.handle_event(event, lock.clone()).await {
101 error!("failed to handle event: {e:?}");
102 }
103 }
104 }
105 Err(errors) => {
106 for error in errors {
107 error!("watch error: {error:?}");
108 }
109 }
110 }
111 }
112 }
113 }
114 }
115 .boxed(),
116 );
117
118 let mut debouncer: Debouncer<WatcherType, _> =
121 new_debouncer(MAX_DEBOUNCE_TIME, None, move |event| {
122 let _ = tx.send(event);
123 })?;
124
125 for path in library_paths {
127 log::debug!("watching path: {}", path.display());
128 debouncer.watch(path, RecursiveMode::Recursive)?;
131 }
132
133 Ok(MusicLibEventHandlerGuard { debouncer, stop_tx })
134}
135
136pub struct MusicLibEventHandlerGuard {
137 debouncer: Debouncer<WatcherType, RecommendedCache>,
138 stop_tx: tokio::sync::oneshot::Sender<()>,
139}
140
141impl MusicLibEventHandlerGuard {
143 #[inline]
144 pub fn stop(self) {
145 let Self { debouncer, stop_tx } = self;
146 stop_tx.send(()).ok();
147 debouncer.stop();
148 }
149}
150
151struct MusicLibEventHandler {
153 db: Arc<Surreal<Db>>,
154 artist_name_separator: OneOrMany<String>,
155 protected_artist_names: OneOrMany<String>,
156 genre_separator: Option<String>,
157}
158
159impl MusicLibEventHandler {
160 pub const fn new(
162 db: Arc<Surreal<Db>>,
163 artist_name_separator: OneOrMany<String>,
164 protected_artist_names: OneOrMany<String>,
165 genre_separator: Option<String>,
166 ) -> Self {
167 Self {
168 db,
169 artist_name_separator,
170 protected_artist_names,
171 genre_separator,
172 }
173 }
174
175 async fn handle_event(
177 &self,
178 event: DebouncedEvent,
179 lock: Arc<Mutex<()>>,
180 ) -> anyhow::Result<()> {
181 trace!("file event detected: {event:?}");
182
183 match event.kind {
184 EventKind::Remove(kind) => {
186 self.remove_event_handler(event, kind, lock).await?;
187 }
188 EventKind::Create(kind) => {
190 self.create_event_handler(event, kind, lock).await?;
191 }
192 EventKind::Modify(kind) => {
194 self.modify_event_handler(event, kind, lock).await?;
195 }
196 EventKind::Any => {
198 warn!("unhandled event (Any): {:?}", event.paths);
199 }
200 EventKind::Other => {
201 warn!("unhandled event (Other): {:?}", event.paths);
202 }
203 EventKind::Access(_) => {}
204 }
205
206 Ok(())
207 }
208
209 async fn remove_event_handler(
211 &self,
212 event: DebouncedEvent,
213 kind: RemoveKind,
214 lock: Arc<Mutex<()>>,
215 ) -> anyhow::Result<()> {
216 match kind {
217 RemoveKind::File => {
218 if let Some(path) = event.paths.first() {
219 let guard = lock.lock().await;
220 match path.extension().map(|ext| ext.to_str()) {
221 Some(Some(ext)) if VALID_AUDIO_EXTENSIONS.contains(&ext) => {
222 info!("file removed: {:?}. removing from db", event.paths);
223 let song = Song::read_by_path(&self.db, path.clone()).await?;
224 if let Some(song) = song {
225 Song::delete(&self.db, song.id).await?;
226 }
227 }
228 _ => {
229 debug!(
230 "file removed: {:?}. not a song, no action needed",
231 event.paths
232 );
233 }
234 }
235 drop(guard);
236 }
237 }
238 RemoveKind::Folder => {} RemoveKind::Any | RemoveKind::Other => {
240 warn!(
241 "unhandled remove event: {:?}. rescan recommended",
242 event.paths
243 );
244 }
245 }
246
247 Ok(())
248 }
249
250 async fn create_event_handler(
252 &self,
253 event: DebouncedEvent,
254 kind: CreateKind,
255 lock: Arc<Mutex<()>>,
256 ) -> anyhow::Result<()> {
257 match kind {
258 CreateKind::File => {
259 if let Some(path) = event.paths.first() {
260 let guard = lock.lock().await;
261 match path.extension().map(|ext| ext.to_str()) {
262 Some(Some(ext)) if VALID_AUDIO_EXTENSIONS.contains(&ext) => {
263 info!("file created: {:?}. adding to db", event.paths);
264
265 let metadata = SongMetadata::load_from_path(
266 path.to_owned(),
267 &self.artist_name_separator,
268 &self.protected_artist_names,
269 self.genre_separator.as_deref(),
270 )?;
271
272 Song::try_load_into_db(&self.db, metadata).await?;
273 }
274 _ => {
275 debug!(
276 "file created: {:?}. not a song, no action needed",
277 event.paths
278 );
279 }
280 }
281 drop(guard);
282 }
283 }
284 CreateKind::Folder => {
285 debug!("folder created: {:?}. no action needed", event.paths);
286 }
287 CreateKind::Any | CreateKind::Other => {
288 warn!(
289 "unhandled create event: {:?}. rescan recommended",
290 event.paths
291 );
292 }
293 }
294 Ok(())
295 }
296
297 async fn modify_event_handler(
299 &self,
300 event: DebouncedEvent,
301 kind: ModifyKind,
302 lock: Arc<Mutex<()>>,
303 ) -> anyhow::Result<()> {
304 match kind {
305 ModifyKind::Data(kind) => if let Some(path) = event.paths.first() {
307 let guard = lock.lock().await;
308 match path.extension().map(|ext| ext.to_str()) {
309 Some(Some(ext)) if VALID_AUDIO_EXTENSIONS.contains(&ext) => {
310 info!("file data modified ({kind:?}): {:?}. updating in db", event.paths);
311
312 let song = Song::read_by_path(&self.db, path.clone()).await?.ok_or(mecomp_storage::errors::Error::NotFound)?;
314
315 let new_metadata: SongMetadata = SongMetadata::load_from_path(
316 path.to_owned(),
317 &self.artist_name_separator,
318 &self.protected_artist_names,
319 self.genre_separator.as_deref(),
320 )?;
321
322 let changeset = new_metadata.merge_with_song(&song);
323
324 Song::update(&self.db, song.id, changeset).await?;
325 }
326 _ => {
327 debug!("file data modified ({kind:?}): {:?}. not a song, no action needed", event.paths);
328 }
329 }
330 drop(guard);
331 },
332 ModifyKind::Name(RenameMode::Both) => {
334 let guard = lock.lock().await;
335 if let (Some(from_path),Some(to_path)) = (event.paths.first(), event.paths.get(1)) {
336 match (from_path.extension().map(|ext| ext.to_string_lossy()),to_path.extension().map(|ext| ext.to_string_lossy())) {
337 (Some(from_ext), Some(to_ext)) if VALID_AUDIO_EXTENSIONS.iter().any(|ext| *ext == from_ext) && VALID_AUDIO_EXTENSIONS.iter().any(|ext| *ext == to_ext) => {
338 info!("file name modified: {:?}. updating in db",
339 event.paths);
340
341 let song = Song::read_by_path(&self.db, from_path.clone()).await?.ok_or(mecomp_storage::errors::Error::NotFound)?;
343
344 Song::update(&self.db, song.id, SongChangeSet{
345 path: Some(to_path.clone()),
346 ..Default::default()
347 }).await?;
348
349 }
350 _ => {
351 debug!(
352 "file name modified: {:?}. not a song, no action needed",
353 event.paths
354 );
355 }
356 }
357 }
358 drop(guard);
359 }
360 ModifyKind::Name(
361 kind @ (
362 RenameMode::From | RenameMode::To )) => {
365 warn!(
366 "file name modified ({kind:?}): {:?}. not enough info to handle properly, rescan recommended",
367 event.paths
368 );
369 }
370 ModifyKind::Name(RenameMode::Other | RenameMode::Any) => {
371 warn!(
372 "unhandled file name modification: {:?}. rescan recommended",
373 event.paths
374 );
375 }
376 ModifyKind::Metadata(
378 MetadataKind::AccessTime
379 | MetadataKind::WriteTime
380 | MetadataKind::Ownership
381 | MetadataKind::Permissions,
382 ) => {}
383 ModifyKind::Metadata(kind@(MetadataKind::Any | MetadataKind::Other | MetadataKind::Extended)) => {
384 warn!(
385 "unhandled metadata modification ({kind:?}): {:?}. rescan recommended",
386 event.paths
387 );
388 }
389 ModifyKind::Any | ModifyKind::Other => {
391 warn!(
392 "unhandled modify event: {:?}. rescan recommended",
393 event.paths
394 );
395 }
396 }
397 Ok(())
398 }
399}
400
401#[cfg(test)]
402mod tests {
403 use crate::test_utils::init;
414
415 use super::*;
416
417 use lofty::file::AudioFile;
418 use pretty_assertions::assert_eq;
419 use rstest::{fixture, rstest};
420 use tempfile::{TempDir, tempdir};
421
422 use mecomp_storage::test_utils::{
423 ARTIST_NAME_SEPARATOR, arb_song_case, create_song_metadata, init_test_database,
424 };
425
426 #[fixture]
427 async fn setup() -> (TempDir, Arc<Surreal<Db>>, MusicLibEventHandlerGuard) {
428 init();
429 let music_lib = tempdir().expect("Failed to create temporary directory");
430 let db = Arc::new(init_test_database().await.unwrap());
431 let interrupt = InterruptReceiver::dummy();
432 let handler = init_music_library_watcher(
433 db.clone(),
434 &[music_lib.path().to_owned()],
435 ARTIST_NAME_SEPARATOR.to_string().into(),
436 OneOrMany::None,
437 Some(ARTIST_NAME_SEPARATOR.into()),
438 interrupt,
439 )
440 .expect("Failed to create music library watcher");
441
442 (music_lib, db, handler)
443 }
444
445 #[rstest]
446 #[timeout(Duration::from_secs(5))]
447 #[tokio::test]
448 async fn test_create_song(
449 #[future] setup: (TempDir, Arc<Surreal<Db>>, MusicLibEventHandlerGuard),
450 ) {
451 let (music_lib, db, handler) = setup.await;
452
453 let metadata = create_song_metadata(&music_lib, arb_song_case()()).unwrap();
455
456 while Song::read_all(&db).await.unwrap().is_empty() {
458 tokio::time::sleep(Duration::from_millis(100)).await;
459 }
460
461 let path = metadata.path.clone();
462 let song = Song::read_by_path(&db, path).await.unwrap().unwrap();
463
464 assert_eq!(metadata, song.into());
466
467 handler.stop();
469 music_lib.close().unwrap();
470 }
471
472 #[rstest]
473 #[timeout(Duration::from_secs(10))]
474 #[tokio::test]
475 async fn test_rename_song(
476 #[future] setup: (TempDir, Arc<Surreal<Db>>, MusicLibEventHandlerGuard),
477 ) {
478 let (music_lib, db, handler) = setup.await;
479
480 let metadata = create_song_metadata(&music_lib, arb_song_case()()).unwrap();
482
483 while Song::read_all(&db).await.unwrap().is_empty() {
485 tokio::time::sleep(Duration::from_millis(100)).await;
486 }
487
488 let path = metadata.path.clone();
489 let song = Song::read_by_path(&db, path.clone())
490 .await
491 .unwrap()
492 .unwrap();
493
494 assert_eq!(metadata, song.clone().into());
496
497 let new_path = music_lib.path().join("new_song.mp3");
499 std::fs::rename(&path, &new_path).unwrap();
500
501 while Song::read_by_path(&db, new_path.clone())
503 .await
504 .unwrap()
505 .is_none()
506 {
507 tokio::time::sleep(Duration::from_millis(100)).await;
508 }
509
510 let new_song = Song::read_by_path(&db, new_path.clone())
511 .await
512 .unwrap()
513 .unwrap();
514 assert_eq!(song.id, new_song.id);
515
516 handler.stop();
518 music_lib.close().unwrap();
519 }
520
521 fn modify_song_metadata(path: &PathBuf, new_name: String) -> anyhow::Result<()> {
522 use lofty::{file::TaggedFileExt, tag::Accessor};
523 let mut tagged_file = lofty::probe::Probe::open(path)?.read()?;
524 let tag = tagged_file
525 .primary_tag_mut()
526 .ok_or_else(|| anyhow::anyhow!("ERROR: No tags found"))?;
527 tag.set_title(new_name);
528 tagged_file.save_to_path(path, lofty::config::WriteOptions::default())?;
529 Ok(())
530 }
531
532 #[rstest]
533 #[timeout(Duration::from_secs(10))]
534 #[tokio::test]
535 async fn test_modify_song(
536 #[future] setup: (TempDir, Arc<Surreal<Db>>, MusicLibEventHandlerGuard),
537 ) {
538 let (music_lib, db, handler) = setup.await;
539
540 let metadata = create_song_metadata(&music_lib, arb_song_case()()).unwrap();
542
543 while Song::read_all(&db).await.unwrap().is_empty() {
545 tokio::time::sleep(Duration::from_millis(100)).await;
546 }
547 let path = metadata.path.clone();
548 let song = Song::read_by_path(&db, path.clone())
549 .await
550 .unwrap()
551 .unwrap();
552
553 assert_eq!(metadata, song.clone().into());
555
556 modify_song_metadata(&path, "new song name".to_string()).unwrap();
558
559 while Song::read_by_path(&db, path.clone())
561 .await
562 .unwrap()
563 .unwrap()
564 .title
565 != "new song name"
566 {
567 tokio::time::sleep(Duration::from_millis(100)).await;
568 }
569
570 handler.stop();
572 music_lib.close().unwrap();
573 }
574
575 #[rstest]
576 #[timeout(Duration::from_secs(10))]
577 #[tokio::test]
578 async fn test_remove_song(
579 #[future] setup: (TempDir, Arc<Surreal<Db>>, MusicLibEventHandlerGuard),
580 ) {
581 let (music_lib, db, handler) = setup.await;
582
583 let metadata = create_song_metadata(&music_lib, arb_song_case()()).unwrap();
585
586 while Song::read_all(&db).await.unwrap().is_empty() {
588 tokio::time::sleep(Duration::from_millis(100)).await;
589 }
590 let path = metadata.path.clone();
591 let song = Song::read_by_path(&db, path.clone())
592 .await
593 .unwrap()
594 .unwrap();
595
596 assert_eq!(metadata, song.clone().into());
598
599 std::fs::remove_file(&path).unwrap();
601
602 while Song::read_by_path(&db, path.clone())
604 .await
605 .unwrap()
606 .is_some()
607 {
608 tokio::time::sleep(Duration::from_millis(100)).await;
609 }
610
611 handler.stop();
613 music_lib.close().unwrap();
614 }
615
616 #[rstest]
617 #[tokio::test]
618 async fn test_remove_empty_folder(
619 #[future] setup: (TempDir, Arc<Surreal<Db>>, MusicLibEventHandlerGuard),
620 ) {
621 let (music_lib, _, handler) = setup.await;
622
623 let empty_folder = music_lib.path().join("empty_folder");
625 std::fs::create_dir(&empty_folder).unwrap();
626
627 tokio::time::sleep(Duration::from_secs(1)).await;
629
630 handler.stop();
632 music_lib.close().unwrap();
633 }
634}