1use std::{path::PathBuf, sync::Arc, time::Duration};
7
8use futures::FutureExt;
9use futures::StreamExt;
10use log::{debug, error, info, trace, warn};
11use mecomp_storage::db::schemas::song::{Song, SongChangeSet, SongMetadata};
12#[cfg(target_os = "macos")]
13use notify::FsEventWatcher;
14#[cfg(target_os = "linux")]
15use notify::INotifyWatcher;
16#[cfg(target_os = "windows")]
17use notify::ReadDirectoryChangesWatcher;
18use notify::{
19 event::{CreateKind, MetadataKind, ModifyKind, RemoveKind, RenameMode},
20 EventKind, RecursiveMode,
21};
22use notify_debouncer_full::RecommendedCache;
23use notify_debouncer_full::{new_debouncer, DebouncedEvent, Debouncer};
24use one_or_many::OneOrMany;
25use surrealdb::{engine::local::Db, Surreal};
26
27#[cfg(target_os = "linux")]
28type WatcherType = INotifyWatcher;
29#[cfg(target_os = "macos")]
30type WatcherType = FsEventWatcher;
31#[cfg(target_os = "windows")]
32type WatcherType = ReadDirectoryChangesWatcher;
33
34const VALID_AUDIO_EXTENSIONS: [&str; 4] = ["mp3", "wav", "ogg", "flac"];
35
36pub const MAX_DEBOUNCE_TIME: Duration = Duration::from_millis(500);
37
38#[allow(clippy::missing_inline_in_public_items)]
58pub fn init_music_library_watcher(
59 db: Arc<Surreal<Db>>,
60 library_paths: &[PathBuf],
61 artist_name_separator: OneOrMany<String>,
62 genre_separator: Option<String>,
63) -> anyhow::Result<MusicLibEventHandlerGuard> {
64 let (tx, rx) = futures::channel::mpsc::unbounded();
65 let (stop_tx, stop_rx) = futures::channel::oneshot::channel();
67
68 std::thread::Builder::new()
70 .name(String::from("Music Library Watcher"))
71 .spawn(move || {
72 let handler = MusicLibEventHandler::new(db, artist_name_separator, genre_separator);
73 futures::executor::block_on(async {
74 let mut stop_rx = stop_rx.fuse();
75 let mut rx = rx.fuse();
76
77 loop {
78 futures::select! {
79 _ = stop_rx => {
80 debug!("stopping watcher");
81 break;
82 }
83 result = rx.select_next_some() => {
84 match result {
85 Ok(events) => {
86 for event in events {
87 if let Err(e) = handler.handle_event(event).await {
88 error!("failed to handle event: {:?}", e);
89 }
90 }
91 }
92 Err(errors) => {
93 for error in errors {
94 error!("watch error: {error:?}");
95 }
96 }
97 }
98 }
99 }
100 }
101 });
102 })?;
103
104 let mut debouncer: Debouncer<WatcherType, _> =
107 new_debouncer(MAX_DEBOUNCE_TIME, None, move |event| {
108 let _ = tx.unbounded_send(event);
109 })?;
110
111 for path in library_paths {
113 log::debug!("watching path: {:?}", path);
114 debouncer.watch(path, RecursiveMode::Recursive)?;
117 }
118
119 Ok(MusicLibEventHandlerGuard { debouncer, stop_tx })
120}
121
122pub struct MusicLibEventHandlerGuard {
123 debouncer: Debouncer<WatcherType, RecommendedCache>,
124 stop_tx: futures::channel::oneshot::Sender<()>,
125}
126
127impl MusicLibEventHandlerGuard {
128 #[inline]
129 pub fn stop(self) {
130 let Self { debouncer, stop_tx } = self;
131 stop_tx.send(()).ok();
132 debouncer.stop();
133 }
134}
135
136struct MusicLibEventHandler {
138 db: Arc<Surreal<Db>>,
139 artist_name_separator: OneOrMany<String>,
140 genre_separator: Option<String>,
141}
142
143impl MusicLibEventHandler {
144 pub const fn new(
146 db: Arc<Surreal<Db>>,
147 artist_name_separator: OneOrMany<String>,
148 genre_separator: Option<String>,
149 ) -> Self {
150 Self {
151 db,
152 artist_name_separator,
153 genre_separator,
154 }
155 }
156
157 async fn handle_event(&self, event: DebouncedEvent) -> anyhow::Result<()> {
159 trace!("file event detected: {event:?}");
160
161 match event.kind {
162 EventKind::Remove(kind) => {
164 self.remove_event_handler(event, kind).await?;
165 }
166 EventKind::Create(kind) => {
168 self.create_event_handler(event, kind).await?;
169 }
170 EventKind::Modify(kind) => {
172 self.modify_event_handler(event, kind).await?;
173 }
174 EventKind::Any => {
176 warn!("unhandled event (Any): {:?}", event.paths);
177 }
178 EventKind::Other => {
179 warn!("unhandled event (Other): {:?}", event.paths);
180 }
181 EventKind::Access(_) => {}
182 }
183
184 Ok(())
185 }
186
187 async fn remove_event_handler(
189 &self,
190 event: DebouncedEvent,
191 kind: RemoveKind,
192 ) -> anyhow::Result<()> {
193 match kind {
194 RemoveKind::File => {
195 if let Some(path) = event.paths.first() {
196 match path.extension().map(|ext| ext.to_str()) {
197 Some(Some(ext)) if VALID_AUDIO_EXTENSIONS.contains(&ext) => {
198 info!("file removed: {:?}. removing from db", event.paths);
199 let song = Song::read_by_path(&self.db, path.clone()).await?;
200 if let Some(song) = song {
201 Song::delete(&self.db, song.id).await?;
202 }
203 }
204 _ => {
205 debug!(
206 "file removed: {:?}. not a song, no action needed",
207 event.paths
208 );
209 }
210 }
211 }
212 }
213 RemoveKind::Folder => {} RemoveKind::Any | RemoveKind::Other => {
215 warn!(
216 "unhandled remove event: {:?}. rescan recommended",
217 event.paths
218 );
219 }
220 }
221
222 Ok(())
223 }
224
225 async fn create_event_handler(
227 &self,
228 event: DebouncedEvent,
229 kind: CreateKind,
230 ) -> anyhow::Result<()> {
231 match kind {
232 CreateKind::File => {
233 if let Some(path) = event.paths.first() {
234 match path.extension().map(|ext| ext.to_str()) {
235 Some(Some(ext)) if VALID_AUDIO_EXTENSIONS.contains(&ext) => {
236 info!("file created: {:?}. adding to db", event.paths);
237
238 let metadata = SongMetadata::load_from_path(
239 path.to_owned(),
240 &self.artist_name_separator,
241 self.genre_separator.as_deref(),
242 )?;
243
244 Song::try_load_into_db(&self.db, metadata).await?;
245 }
246 _ => {
247 debug!(
248 "file created: {:?}. not a song, no action needed",
249 event.paths
250 );
251 }
252 }
253 }
254 }
255 CreateKind::Folder => {
256 debug!("folder created: {:?}. no action needed", event.paths);
257 }
258 CreateKind::Any | CreateKind::Other => {
259 warn!(
260 "unhandled create event: {:?}. rescan recommended",
261 event.paths
262 );
263 }
264 }
265 Ok(())
266 }
267
268 async fn modify_event_handler(
270 &self,
271 event: DebouncedEvent,
272 kind: ModifyKind,
273 ) -> anyhow::Result<()> {
274 match kind {
275 ModifyKind::Data(kind) => if let Some(path) = event.paths.first() {
277 match path.extension().map(|ext| ext.to_str()) {
278 Some(Some(ext)) if VALID_AUDIO_EXTENSIONS.contains(&ext) => {
279 info!("file data modified ({kind:?}): {:?}. updating in db", event.paths);
280
281 let song = Song::read_by_path(&self.db, path.clone()).await?.ok_or(mecomp_storage::errors::Error::NotFound)?;
283
284 let new_metadata: SongMetadata = SongMetadata::load_from_path(
285 path.to_owned(),
286 &self.artist_name_separator,
287 self.genre_separator.as_deref(),
288 )?;
289
290 let changeset = new_metadata.merge_with_song(&song);
291
292 Song::update(&self.db, song.id, changeset).await?;
293 }
294 _ => {
295 debug!("file data modified ({kind:?}): {:?}. not a song, no action needed", event.paths);
296 }
297 }
298 },
299 ModifyKind::Name(RenameMode::Both) => {
301 if let (Some(from_path),Some(to_path)) = (event.paths.first(), event.paths.get(1)) {
302 match (from_path.extension().map(|ext| ext.to_string_lossy()),to_path.extension().map(|ext| ext.to_string_lossy())) {
303 (Some(from_ext), Some(to_ext)) if VALID_AUDIO_EXTENSIONS.iter().any(|ext| *ext == from_ext) && VALID_AUDIO_EXTENSIONS.iter().any(|ext| *ext == to_ext) => {
304 info!("file name modified: {:?}. updating in db",
305 event.paths);
306
307 let song = Song::read_by_path(&self.db, from_path.clone()).await?.ok_or(mecomp_storage::errors::Error::NotFound)?;
309
310 Song::update(&self.db, song.id, SongChangeSet{
311 path: Some(to_path.clone()),
312 ..Default::default()
313 }).await?;
314
315 }
316 _ => {
317 debug!(
318 "file name modified: {:?}. not a song, no action needed",
319 event.paths
320 );
321 }
322 }
323 }
324
325 }
326 ModifyKind::Name(
327 kind @ (
328 RenameMode::From | RenameMode::To )) => {
331 warn!(
332 "file name modified ({kind:?}): {:?}. not enough info to handle properly, rescan recommended",
333 event.paths
334 );
335 }
336 ModifyKind::Name(RenameMode::Other | RenameMode::Any) => {
337 warn!(
338 "unhandled file name modification: {:?}. rescan recommended",
339 event.paths
340 );
341 }
342 ModifyKind::Metadata(
344 MetadataKind::AccessTime
345 | MetadataKind::WriteTime
346 | MetadataKind::Ownership
347 | MetadataKind::Permissions,
348 ) => {}
349 ModifyKind::Metadata(kind@(MetadataKind::Any | MetadataKind::Other | MetadataKind::Extended)) => {
350 warn!(
351 "unhandled metadata modification ({kind:?}): {:?}. rescan recommended",
352 event.paths
353 );
354 }
355 ModifyKind::Any | ModifyKind::Other => {
357 warn!(
358 "unhandled modify event: {:?}. rescan recommended",
359 event.paths
360 );
361 }
362 }
363 Ok(())
364 }
365}
366
367#[cfg(test)]
368mod tests {
369 use crate::test_utils::init;
380
381 use super::*;
382
383 use lofty::file::AudioFile;
384 use pretty_assertions::assert_eq;
385 use rstest::{fixture, rstest};
386 use tempfile::{tempdir, TempDir};
387
388 use mecomp_storage::test_utils::{
389 arb_song_case, create_song_metadata, init_test_database, ARTIST_NAME_SEPARATOR,
390 };
391
392 #[fixture]
393 async fn setup() -> (TempDir, Arc<Surreal<Db>>, MusicLibEventHandlerGuard) {
394 init();
395 let music_lib = tempdir().expect("Failed to create temporary directory");
396 let db = Arc::new(init_test_database().await.unwrap());
397 let handler = init_music_library_watcher(
398 db.clone(),
399 &[music_lib.path().to_owned()],
400 OneOrMany::One(ARTIST_NAME_SEPARATOR.into()),
401 Some(ARTIST_NAME_SEPARATOR.into()),
402 )
403 .expect("Failed to create music library watcher");
404
405 (music_lib, db, handler)
406 }
407
408 #[rstest]
409 #[timeout(Duration::from_secs(5))]
410 #[tokio::test]
411 async fn test_create_song(
412 #[future] setup: (TempDir, Arc<Surreal<Db>>, MusicLibEventHandlerGuard),
413 ) {
414 let (music_lib, db, handler) = setup.await;
415
416 let metadata = create_song_metadata(&music_lib, arb_song_case()()).unwrap();
418
419 while Song::read_all(&db).await.unwrap().is_empty() {
421 tokio::time::sleep(Duration::from_millis(100)).await;
422 }
423
424 let path = metadata.path.clone();
425 let song = Song::read_by_path(&db, path).await.unwrap().unwrap();
426
427 assert_eq!(metadata, song.into());
429
430 handler.stop();
432 music_lib.close().unwrap();
433 }
434
435 #[rstest]
436 #[timeout(Duration::from_secs(5))]
437 #[tokio::test]
438 async fn test_rename_song(
439 #[future] setup: (TempDir, Arc<Surreal<Db>>, MusicLibEventHandlerGuard),
440 ) {
441 let (music_lib, db, handler) = setup.await;
442
443 let metadata = create_song_metadata(&music_lib, arb_song_case()()).unwrap();
445
446 while Song::read_all(&db).await.unwrap().is_empty() {
448 tokio::time::sleep(Duration::from_millis(100)).await;
449 }
450
451 let path = metadata.path.clone();
452 let song = Song::read_by_path(&db, path.clone())
453 .await
454 .unwrap()
455 .unwrap();
456
457 assert_eq!(metadata, song.clone().into());
459
460 let new_path = music_lib.path().join("new_song.mp3");
462 std::fs::rename(&path, &new_path).unwrap();
463
464 while Song::read_by_path(&db, new_path.clone())
466 .await
467 .unwrap()
468 .is_none()
469 {
470 tokio::time::sleep(Duration::from_millis(100)).await;
471 }
472
473 let new_song = Song::read_by_path(&db, new_path.clone())
474 .await
475 .unwrap()
476 .unwrap();
477 assert_eq!(song.id, new_song.id);
478
479 handler.stop();
481 music_lib.close().unwrap();
482 }
483
484 fn modify_song_metadata(path: PathBuf, new_name: String) -> anyhow::Result<()> {
485 use lofty::{file::TaggedFileExt, tag::Accessor};
486 let mut tagged_file = lofty::probe::Probe::open(&path)?.read()?;
487 let tag = tagged_file
488 .primary_tag_mut()
489 .ok_or(anyhow::anyhow!("ERROR: No tags found"))?;
490 tag.set_title(new_name);
491 tagged_file.save_to_path(&path, lofty::config::WriteOptions::default())?;
492 Ok(())
493 }
494
495 #[rstest]
496 #[timeout(Duration::from_secs(5))]
497 #[tokio::test]
498 async fn test_modify_song(
499 #[future] setup: (TempDir, Arc<Surreal<Db>>, MusicLibEventHandlerGuard),
500 ) {
501 let (music_lib, db, handler) = setup.await;
502
503 let metadata = create_song_metadata(&music_lib, arb_song_case()()).unwrap();
505
506 while Song::read_all(&db).await.unwrap().is_empty() {
508 tokio::time::sleep(Duration::from_millis(100)).await;
509 }
510 let path = metadata.path.clone();
511 let song = Song::read_by_path(&db, path.clone())
512 .await
513 .unwrap()
514 .unwrap();
515
516 assert_eq!(metadata, song.clone().into());
518
519 modify_song_metadata(path.clone(), "new song name".to_string()).unwrap();
521
522 while Song::read_by_path(&db, path.clone())
524 .await
525 .unwrap()
526 .unwrap()
527 .title
528 != "new song name".into()
529 {
530 tokio::time::sleep(Duration::from_millis(100)).await;
531 }
532
533 handler.stop();
535 music_lib.close().unwrap();
536 }
537
538 #[rstest]
539 #[timeout(Duration::from_secs(5))]
540 #[tokio::test]
541 async fn test_remove_song(
542 #[future] setup: (TempDir, Arc<Surreal<Db>>, MusicLibEventHandlerGuard),
543 ) {
544 let (music_lib, db, handler) = setup.await;
545
546 let metadata = create_song_metadata(&music_lib, arb_song_case()()).unwrap();
548
549 while Song::read_all(&db).await.unwrap().is_empty() {
551 tokio::time::sleep(Duration::from_millis(100)).await;
552 }
553 let path = metadata.path.clone();
554 let song = Song::read_by_path(&db, path.clone())
555 .await
556 .unwrap()
557 .unwrap();
558
559 assert_eq!(metadata, song.clone().into());
561
562 std::fs::remove_file(&path).unwrap();
564
565 while Song::read_by_path(&db, path.clone())
567 .await
568 .unwrap()
569 .is_some()
570 {
571 tokio::time::sleep(Duration::from_millis(100)).await;
572 }
573
574 handler.stop();
576 music_lib.close().unwrap();
577 }
578
579 #[rstest]
580 #[tokio::test]
581 async fn test_remove_empty_folder(
582 #[future] setup: (TempDir, Arc<Surreal<Db>>, MusicLibEventHandlerGuard),
583 ) {
584 let (music_lib, _, handler) = setup.await;
585
586 let empty_folder = music_lib.path().join("empty_folder");
588 std::fs::create_dir(&empty_folder).unwrap();
589
590 tokio::time::sleep(Duration::from_secs(1)).await;
592
593 handler.stop();
595 music_lib.close().unwrap();
596 }
597}