1use std::{path::PathBuf, sync::Arc, time::Duration};
7
8use futures::FutureExt;
9use futures::StreamExt;
10use log::{debug, error, info, trace, warn};
11use mecomp_storage::db::schemas::song::{Song, SongChangeSet, SongMetadata};
12#[cfg(target_os = "macos")]
13use notify::FsEventWatcher;
14#[cfg(target_os = "linux")]
15use notify::INotifyWatcher;
16#[cfg(target_os = "windows")]
17use notify::ReadDirectoryChangesWatcher;
18use notify::{
19 event::{CreateKind, MetadataKind, ModifyKind, RemoveKind, RenameMode},
20 EventKind, RecursiveMode,
21};
22use notify_debouncer_full::RecommendedCache;
23use notify_debouncer_full::{new_debouncer, DebouncedEvent, Debouncer};
24use one_or_many::OneOrMany;
25use surrealdb::{engine::local::Db, Surreal};
26
27#[cfg(target_os = "linux")]
28type WatcherType = INotifyWatcher;
29#[cfg(target_os = "macos")]
30type WatcherType = FsEventWatcher;
31#[cfg(target_os = "windows")]
32type WatcherType = ReadDirectoryChangesWatcher;
33
34const VALID_AUDIO_EXTENSIONS: [&str; 4] = ["mp3", "wav", "ogg", "flac"];
35
36pub const MAX_DEBOUNCE_TIME: Duration = Duration::from_millis(500);
37
38pub fn init_music_library_watcher(
58 db: Arc<Surreal<Db>>,
59 library_paths: &[PathBuf],
60 artist_name_separator: OneOrMany<String>,
61 genre_separator: Option<String>,
62) -> anyhow::Result<MusicLibEventHandlerGuard> {
63 let (tx, rx) = futures::channel::mpsc::unbounded();
64 let (stop_tx, stop_rx) = futures::channel::oneshot::channel();
66
67 std::thread::Builder::new()
69 .name(String::from("Music Library Watcher"))
70 .spawn(move || {
71 let handler = MusicLibEventHandler::new(db, artist_name_separator, genre_separator);
72 futures::executor::block_on(async {
73 let mut stop_rx = stop_rx.fuse();
74 let mut rx = rx.fuse();
75
76 loop {
77 futures::select! {
78 _ = stop_rx => {
79 debug!("stopping watcher");
80 break;
81 }
82 result = rx.select_next_some() => {
83 match result {
84 Ok(events) => {
85 for event in events {
86 if let Err(e) = handler.handle_event(event).await {
87 error!("failed to handle event: {:?}", e);
88 }
89 }
90 }
91 Err(errors) => {
92 for error in errors {
93 error!("watch error: {error:?}");
94 }
95 }
96 }
97 }
98 }
99 }
100 });
101 })?;
102
103 let mut debouncer: Debouncer<WatcherType, _> =
106 new_debouncer(MAX_DEBOUNCE_TIME, None, move |event| {
107 let _ = tx.unbounded_send(event);
108 })?;
109
110 for path in library_paths {
112 log::debug!("watching path: {:?}", path);
113 debouncer.watch(path, RecursiveMode::Recursive)?;
116 }
117
118 Ok(MusicLibEventHandlerGuard { debouncer, stop_tx })
119}
120
121pub struct MusicLibEventHandlerGuard {
122 debouncer: Debouncer<WatcherType, RecommendedCache>,
123 stop_tx: futures::channel::oneshot::Sender<()>,
124}
125
126impl MusicLibEventHandlerGuard {
127 pub fn stop(self) {
128 let Self { debouncer, stop_tx } = self;
129 stop_tx.send(()).ok();
130 debouncer.stop();
131 }
132}
133
134struct MusicLibEventHandler {
136 db: Arc<Surreal<Db>>,
137 artist_name_separator: OneOrMany<String>,
138 genre_separator: Option<String>,
139}
140
141impl MusicLibEventHandler {
142 pub const fn new(
144 db: Arc<Surreal<Db>>,
145 artist_name_separator: OneOrMany<String>,
146 genre_separator: Option<String>,
147 ) -> Self {
148 Self {
149 db,
150 artist_name_separator,
151 genre_separator,
152 }
153 }
154
155 async fn handle_event(&self, event: DebouncedEvent) -> anyhow::Result<()> {
157 trace!("file event detected: {event:?}");
158
159 match event.kind {
160 EventKind::Remove(kind) => {
162 self.remove_event_handler(event, kind).await?;
163 }
164 EventKind::Create(kind) => {
166 self.create_event_handler(event, kind).await?;
167 }
168 EventKind::Modify(kind) => {
170 self.modify_event_handler(event, kind).await?;
171 }
172 EventKind::Any => {
174 warn!("unhandled event (Any): {:?}", event.paths);
175 }
176 EventKind::Other => {
177 warn!("unhandled event (Other): {:?}", event.paths);
178 }
179 EventKind::Access(_) => {}
180 }
181
182 Ok(())
183 }
184
185 async fn remove_event_handler(
187 &self,
188 event: DebouncedEvent,
189 kind: RemoveKind,
190 ) -> anyhow::Result<()> {
191 match kind {
192 RemoveKind::File => {
193 if let Some(path) = event.paths.first() {
194 match path.extension().map(|ext| ext.to_str()) {
195 Some(Some(ext)) if VALID_AUDIO_EXTENSIONS.contains(&ext) => {
196 info!("file removed: {:?}. removing from db", event.paths);
197 let song = Song::read_by_path(&self.db, path.clone()).await?;
198 if let Some(song) = song {
199 Song::delete(&self.db, song.id).await?;
200 }
201 }
202 _ => {
203 debug!(
204 "file removed: {:?}. not a song, no action needed",
205 event.paths
206 );
207 }
208 }
209 }
210 }
211 RemoveKind::Folder => {} RemoveKind::Any | RemoveKind::Other => {
213 warn!(
214 "unhandled remove event: {:?}. rescan recommended",
215 event.paths
216 );
217 }
218 }
219
220 Ok(())
221 }
222
223 async fn create_event_handler(
225 &self,
226 event: DebouncedEvent,
227 kind: CreateKind,
228 ) -> anyhow::Result<()> {
229 match kind {
230 CreateKind::File => {
231 if let Some(path) = event.paths.first() {
232 match path.extension().map(|ext| ext.to_str()) {
233 Some(Some(ext)) if VALID_AUDIO_EXTENSIONS.contains(&ext) => {
234 info!("file created: {:?}. adding to db", event.paths);
235
236 let metadata = SongMetadata::load_from_path(
237 path.to_owned(),
238 &self.artist_name_separator,
239 self.genre_separator.as_deref(),
240 )?;
241
242 Song::try_load_into_db(&self.db, metadata).await?;
243 }
244 _ => {
245 debug!(
246 "file created: {:?}. not a song, no action needed",
247 event.paths
248 );
249 }
250 }
251 }
252 }
253 CreateKind::Folder => {
254 debug!("folder created: {:?}. no action needed", event.paths);
255 }
256 CreateKind::Any | CreateKind::Other => {
257 warn!(
258 "unhandled create event: {:?}. rescan recommended",
259 event.paths
260 );
261 }
262 }
263 Ok(())
264 }
265
266 async fn modify_event_handler(
268 &self,
269 event: DebouncedEvent,
270 kind: ModifyKind,
271 ) -> anyhow::Result<()> {
272 match kind {
273 ModifyKind::Data(kind) => if let Some(path) = event.paths.first() {
275 match path.extension().map(|ext| ext.to_str()) {
276 Some(Some(ext)) if VALID_AUDIO_EXTENSIONS.contains(&ext) => {
277 info!("file data modified ({kind:?}): {:?}. updating in db", event.paths);
278
279 let song = Song::read_by_path(&self.db, path.clone()).await?.ok_or(mecomp_storage::errors::Error::NotFound)?;
281
282 let new_metadata: SongMetadata = SongMetadata::load_from_path(
283 path.to_owned(),
284 &self.artist_name_separator,
285 self.genre_separator.as_deref(),
286 )?;
287
288 let changeset = new_metadata.merge_with_song(&song);
289
290 Song::update(&self.db, song.id, changeset).await?;
291 }
292 _ => {
293 debug!("file data modified ({kind:?}): {:?}. not a song, no action needed", event.paths);
294 }
295 }
296 },
297 ModifyKind::Name(RenameMode::Both) => {
299 if let (Some(from_path),Some(to_path)) = (event.paths.first(), event.paths.get(1)) {
300 match (from_path.extension().map(|ext| ext.to_string_lossy()),to_path.extension().map(|ext| ext.to_string_lossy())) {
301 (Some(from_ext), Some(to_ext)) if VALID_AUDIO_EXTENSIONS.iter().any(|ext| *ext == from_ext) && VALID_AUDIO_EXTENSIONS.iter().any(|ext| *ext == to_ext) => {
302 info!("file name modified: {:?}. updating in db",
303 event.paths);
304
305 let song = Song::read_by_path(&self.db, from_path.clone()).await?.ok_or(mecomp_storage::errors::Error::NotFound)?;
307
308 Song::update(&self.db, song.id, SongChangeSet{
309 path: Some(to_path.clone()),
310 ..Default::default()
311 }).await?;
312
313 }
314 _ => {
315 debug!(
316 "file name modified: {:?}. not a song, no action needed",
317 event.paths
318 );
319 }
320 }
321 }
322
323 }
324 ModifyKind::Name(
325 kind @ (
326 RenameMode::From | RenameMode::To )) => {
329 warn!(
330 "file name modified ({kind:?}): {:?}. not enough info to handle properly, rescan recommended",
331 event.paths
332 );
333 }
334 ModifyKind::Name(RenameMode::Other | RenameMode::Any) => {
335 warn!(
336 "unhandled file name modification: {:?}. rescan recommended",
337 event.paths
338 );
339 }
340 ModifyKind::Metadata(
342 MetadataKind::AccessTime
343 | MetadataKind::WriteTime
344 | MetadataKind::Ownership
345 | MetadataKind::Permissions,
346 ) => {}
347 ModifyKind::Metadata(kind@(MetadataKind::Any | MetadataKind::Other | MetadataKind::Extended)) => {
348 warn!(
349 "unhandled metadata modification ({kind:?}): {:?}. rescan recommended",
350 event.paths
351 );
352 }
353 ModifyKind::Any | ModifyKind::Other => {
355 warn!(
356 "unhandled modify event: {:?}. rescan recommended",
357 event.paths
358 );
359 }
360 }
361 Ok(())
362 }
363}
364
365#[cfg(test)]
366mod tests {
367 use crate::test_utils::init;
378
379 use super::*;
380
381 use lofty::file::AudioFile;
382 use pretty_assertions::assert_eq;
383 use rstest::{fixture, rstest};
384 use tempfile::{tempdir, TempDir};
385
386 use mecomp_storage::test_utils::{
387 arb_song_case, create_song_metadata, init_test_database, ARTIST_NAME_SEPARATOR,
388 };
389
390 #[fixture]
391 async fn setup() -> (TempDir, Arc<Surreal<Db>>, MusicLibEventHandlerGuard) {
392 init();
393 let music_lib = tempdir().expect("Failed to create temporary directory");
394 let db = Arc::new(init_test_database().await.unwrap());
395 let handler = init_music_library_watcher(
396 db.clone(),
397 &[music_lib.path().to_owned()],
398 OneOrMany::One(ARTIST_NAME_SEPARATOR.into()),
399 Some(ARTIST_NAME_SEPARATOR.into()),
400 )
401 .expect("Failed to create music library watcher");
402
403 (music_lib, db, handler)
404 }
405
406 #[rstest]
407 #[timeout(Duration::from_secs(5))]
408 #[tokio::test]
409 async fn test_create_song(
410 #[future] setup: (TempDir, Arc<Surreal<Db>>, MusicLibEventHandlerGuard),
411 ) {
412 let (music_lib, db, handler) = setup.await;
413
414 let metadata = create_song_metadata(&music_lib, arb_song_case()()).unwrap();
416
417 while Song::read_all(&db).await.unwrap().is_empty() {
419 tokio::time::sleep(Duration::from_millis(100)).await;
420 }
421
422 let path = metadata.path.clone();
423 let song = Song::read_by_path(&db, path).await.unwrap().unwrap();
424
425 assert_eq!(metadata, song.into());
427
428 handler.stop();
430 music_lib.close().unwrap();
431 }
432
433 #[rstest]
434 #[timeout(Duration::from_secs(5))]
435 #[tokio::test]
436 async fn test_rename_song(
437 #[future] setup: (TempDir, Arc<Surreal<Db>>, MusicLibEventHandlerGuard),
438 ) {
439 let (music_lib, db, handler) = setup.await;
440
441 let metadata = create_song_metadata(&music_lib, arb_song_case()()).unwrap();
443
444 while Song::read_all(&db).await.unwrap().is_empty() {
446 tokio::time::sleep(Duration::from_millis(100)).await;
447 }
448
449 let path = metadata.path.clone();
450 let song = Song::read_by_path(&db, path.clone())
451 .await
452 .unwrap()
453 .unwrap();
454
455 assert_eq!(metadata, song.clone().into());
457
458 let new_path = music_lib.path().join("new_song.mp3");
460 std::fs::rename(&path, &new_path).unwrap();
461
462 while Song::read_by_path(&db, new_path.clone())
464 .await
465 .unwrap()
466 .is_none()
467 {
468 tokio::time::sleep(Duration::from_millis(100)).await;
469 }
470
471 let new_song = Song::read_by_path(&db, new_path.clone())
472 .await
473 .unwrap()
474 .unwrap();
475 assert_eq!(song.id, new_song.id);
476
477 handler.stop();
479 music_lib.close().unwrap();
480 }
481
482 fn modify_song_metadata(path: PathBuf, new_name: String) -> anyhow::Result<()> {
483 use lofty::{file::TaggedFileExt, tag::Accessor};
484 let mut tagged_file = lofty::probe::Probe::open(&path)?.read()?;
485 let tag = tagged_file
486 .primary_tag_mut()
487 .ok_or(anyhow::anyhow!("ERROR: No tags found"))?;
488 tag.set_title(new_name);
489 tagged_file.save_to_path(&path, lofty::config::WriteOptions::default())?;
490 Ok(())
491 }
492
493 #[rstest]
494 #[timeout(Duration::from_secs(5))]
495 #[tokio::test]
496 async fn test_modify_song(
497 #[future] setup: (TempDir, Arc<Surreal<Db>>, MusicLibEventHandlerGuard),
498 ) {
499 let (music_lib, db, handler) = setup.await;
500
501 let metadata = create_song_metadata(&music_lib, arb_song_case()()).unwrap();
503
504 while Song::read_all(&db).await.unwrap().is_empty() {
506 tokio::time::sleep(Duration::from_millis(100)).await;
507 }
508 let path = metadata.path.clone();
509 let song = Song::read_by_path(&db, path.clone())
510 .await
511 .unwrap()
512 .unwrap();
513
514 assert_eq!(metadata, song.clone().into());
516
517 modify_song_metadata(path.clone(), "new song name".to_string()).unwrap();
519
520 while Song::read_by_path(&db, path.clone())
522 .await
523 .unwrap()
524 .unwrap()
525 .title
526 != "new song name".into()
527 {
528 tokio::time::sleep(Duration::from_millis(100)).await;
529 }
530
531 handler.stop();
533 music_lib.close().unwrap();
534 }
535
536 #[rstest]
537 #[timeout(Duration::from_secs(5))]
538 #[tokio::test]
539 async fn test_remove_song(
540 #[future] setup: (TempDir, Arc<Surreal<Db>>, MusicLibEventHandlerGuard),
541 ) {
542 let (music_lib, db, handler) = setup.await;
543
544 let metadata = create_song_metadata(&music_lib, arb_song_case()()).unwrap();
546
547 while Song::read_all(&db).await.unwrap().is_empty() {
549 tokio::time::sleep(Duration::from_millis(100)).await;
550 }
551 let path = metadata.path.clone();
552 let song = Song::read_by_path(&db, path.clone())
553 .await
554 .unwrap()
555 .unwrap();
556
557 assert_eq!(metadata, song.clone().into());
559
560 std::fs::remove_file(&path).unwrap();
562
563 while Song::read_by_path(&db, path.clone())
565 .await
566 .unwrap()
567 .is_some()
568 {
569 tokio::time::sleep(Duration::from_millis(100)).await;
570 }
571
572 handler.stop();
574 music_lib.close().unwrap();
575 }
576
577 #[rstest]
578 #[tokio::test]
579 async fn test_remove_empty_folder(
580 #[future] setup: (TempDir, Arc<Surreal<Db>>, MusicLibEventHandlerGuard),
581 ) {
582 let (music_lib, _, handler) = setup.await;
583
584 let empty_folder = music_lib.path().join("empty_folder");
586 std::fs::create_dir(&empty_folder).unwrap();
587
588 tokio::time::sleep(Duration::from_secs(1)).await;
590
591 handler.stop();
593 music_lib.close().unwrap();
594 }
595}