1use std::{
2 collections::{HashMap, HashSet},
3 num::NonZeroUsize,
4 path::PathBuf,
5 time::Duration,
6};
7
8use log::{debug, error, info, warn};
9use mecomp_analysis::{
10 clustering::{ClusteringHelper, KOptimal},
11 decoder::{Decoder, MecompDecoder},
12 embeddings::ModelConfig,
13};
14use mecomp_core::config::{AnalysisKind, AnalysisSettings, ReclusterSettings};
15use mecomp_prost::{LibraryBrief, LibraryFull, LibraryHealth};
16use one_or_many::OneOrMany;
17use surrealdb::{Connection, Surreal};
18use tracing::{Instrument, instrument};
19use walkdir::WalkDir;
20
21use mecomp_storage::{
22 db::{
23 health::{
24 count_albums, count_artists, count_collections, count_dynamic_playlists,
25 count_orphaned_albums, count_orphaned_artists, count_orphaned_collections,
26 count_orphaned_playlists, count_playlists, count_songs, count_unanalyzed_songs,
27 },
28 schemas::{
29 album::Album,
30 analysis::Analysis,
31 artist::Artist,
32 collection::Collection,
33 dynamic::DynamicPlaylist,
34 playlist::Playlist,
35 song::{Song, SongMetadata},
36 },
37 },
38 errors::Error,
39 util::MetadataConflictResolution,
40};
41
42use crate::termination::InterruptReceiver;
43
44#[instrument]
52#[inline]
53pub async fn rescan<C: Connection>(
54 db: &Surreal<C>,
55 paths: &[PathBuf],
56 artist_name_separator: &OneOrMany<String>,
57 protected_artist_names: &OneOrMany<String>,
58 genre_separator: Option<&str>,
59 conflict_resolution_mode: MetadataConflictResolution,
60) -> Result<(), Error> {
61 let mut visited_paths = check_library(
63 db,
64 artist_name_separator,
65 protected_artist_names,
66 genre_separator,
67 conflict_resolution_mode,
68 )
69 .await?;
70
71 index_new_songs(
73 db,
74 paths,
75 &mut visited_paths,
76 artist_name_separator,
77 protected_artist_names,
78 genre_separator,
79 )
80 .await?;
81
82 delete_orphans(db).await?;
84
85 info!("Library rescan complete");
86 info!("Library health: {:?}", health(db).await?);
87
88 Ok(())
89}
90
91#[instrument]
101async fn check_library<C: Connection>(
102 db: &Surreal<C>,
103 artist_name_separator: &OneOrMany<String>,
104 protected_artist_names: &OneOrMany<String>,
105 genre_separator: Option<&str>,
106 conflict_resolution_mode: MetadataConflictResolution,
107) -> Result<HashSet<PathBuf>, Error> {
108 let mut paths_to_skip = HashSet::new();
111
112 let songs = Song::read_all(db).await?;
113 for song in songs {
114 let path = &song.path;
115 if !path.exists() {
116 warn!("Song {} no longer exists, deleting", path.display());
118 Song::delete(db, song.id).await?;
119 continue;
120 }
121
122 debug!("loading metadata for {}", path.display());
123 match SongMetadata::load_from_path(
125 path.clone(),
126 artist_name_separator,
127 protected_artist_names,
128 genre_separator,
129 ) {
130 Ok(metadata) if metadata != SongMetadata::from(&song) => {
132 let log_postfix = if conflict_resolution_mode == MetadataConflictResolution::Skip {
133 "but conflict resolution mode is \"skip\", so we do nothing"
134 } else {
135 "resolving conflict"
136 };
137 info!(
138 "{} has conflicting metadata with index, {log_postfix}",
139 path.display(),
140 );
141
142 match conflict_resolution_mode {
143 MetadataConflictResolution::Overwrite => {
145 Song::update(db, song.id.clone(), metadata.merge_with_song(&song)).await?;
147 }
148 MetadataConflictResolution::Skip => {}
150 }
151 }
152 Err(e) => {
154 warn!("Error reading metadata for {}: {e}", path.display());
155 info!(
156 "assuming the file isn't a song or doesn't exist anymore, removing from library"
157 );
158 Song::delete(db, song.id).await?;
159 }
160 _ => {}
162 }
163
164 paths_to_skip.insert(path.clone());
166 }
167
168 Ok(paths_to_skip)
169}
170
171#[instrument]
179async fn index_new_songs<C: Connection>(
180 db: &Surreal<C>,
181 paths: &[PathBuf],
182 visited_paths: &mut HashSet<PathBuf>,
183 artist_name_separator: &OneOrMany<String>,
184 protected_artist_names: &OneOrMany<String>,
185 genre_separator: Option<&str>,
186) -> Result<(), Error> {
187 debug!("Indexing paths: {paths:?}");
188
189 const BATCH_SIZE: usize = 100;
190 let mut metadata_batch = Vec::with_capacity(BATCH_SIZE);
191 let mut processed_count = 0;
192
193 for path in paths
194 .iter()
195 .filter_map(|p| {
196 p.canonicalize()
197 .inspect_err(|e| warn!("Error canonicalizing path: {e}"))
198 .ok()
199 })
200 .flat_map(|x| WalkDir::new(x).into_iter())
201 .filter_map(|x| x.inspect_err(|e| warn!("Error reading path: {e}")).ok())
202 .filter_map(|x| x.file_type().is_file().then_some(x))
203 .filter(|path| visited_paths.insert(path.path().to_owned()))
204 {
205 match SongMetadata::load_from_path(
207 path.path().to_path_buf(),
208 artist_name_separator,
209 protected_artist_names,
210 genre_separator,
211 ) {
212 Ok(metadata) => {
213 metadata_batch.push(metadata);
214
215 if metadata_batch.len() >= BATCH_SIZE {
217 match Song::bulk_load_into_db(db, &metadata_batch).await {
218 Ok(songs) => {
219 processed_count += songs.len();
220 info!("Indexed batch: {processed_count} songs total");
221 }
222 Err(e) => error!("Error indexing batch: {e}"),
223 }
224 metadata_batch.clear();
225 }
226 }
227 Err(e) => warn!("Error reading metadata for {}: {e}", path.path().display()),
228 }
229 }
230
231 if !metadata_batch.is_empty() {
233 match Song::bulk_load_into_db(db, &metadata_batch).await {
234 Ok(songs) => {
235 processed_count += songs.len();
236 info!("Indexed final batch: {processed_count} songs total");
237 }
238 Err(e) => error!("Error indexing final batch: {e}"),
239 }
240 }
241
242 info!("Finished indexing {processed_count} new songs");
243
244 Ok(())
245}
246
247async fn delete_orphans<C: Connection>(db: &Surreal<C>) -> Result<(), Error> {
249 macro_rules! delete_orphans {
251 ($model:ident, $db:expr) => {
252 let orphans = $model::delete_orphaned($db)
253 .instrument(tracing::info_span!(concat!(
254 "Deleting orphaned ",
255 stringify!($model)
256 )))
257 .await?;
258 if !orphans.is_empty() {
259 info!("Deleted orphaned {}: {orphans:?}", stringify!($model));
260 }
261 };
262 }
263
264 delete_orphans!(Album, db);
265 delete_orphans!(Artist, db);
266 delete_orphans!(Collection, db);
267 delete_orphans!(Playlist, db);
268
269 Ok(())
270}
271
272#[instrument]
288#[inline]
289pub async fn analyze<C: Connection>(
290 db: &Surreal<C>,
291 mut interrupt: InterruptReceiver,
292 overwrite: bool,
293 settings: &AnalysisSettings,
294 config: ModelConfig,
295) -> Result<(), Error> {
296 if overwrite {
297 Analysis::delete_all(db)
299 .instrument(tracing::info_span!("Deleting existing analyses"))
300 .await?;
301 }
302
303 let songs_to_analyze: Vec<Song> = Analysis::read_songs_without_analysis(db).await?;
305 let paths = songs_to_analyze
307 .into_iter()
308 .map(|song| (song.path, song.id))
309 .collect::<HashMap<_, _>>();
310
311 let keys = paths.keys().cloned().collect::<Vec<_>>();
312
313 const ONE: NonZeroUsize = NonZeroUsize::new(1).unwrap();
318 let channel_buffer_size = 2 * settings
319 .num_threads
320 .unwrap_or_else(|| std::thread::available_parallelism().unwrap_or(ONE))
321 .get();
322 let (tx, rx) = std::sync::mpsc::sync_channel(channel_buffer_size);
323
324 let Ok(decoder) = MecompDecoder::new() else {
325 error!("Error creating decoder");
326 return Ok(());
327 };
328
329 let num_threads = settings.num_threads;
331 let handle = tokio::task::spawn_blocking(move || {
332 if let Some(num) = num_threads {
333 decoder.process_songs_with_cores(&keys, tx, num, config.clone())
334 } else {
335 decoder.process_songs(&keys, tx, config.clone())
336 }
337 });
338 let abort = handle.abort_handle();
339
340 async {
341 for (song_path, maybe_analysis, maybe_embedding) in rx {
342 if interrupt.is_stopped() {
343 info!("Analysis interrupted");
344 break;
345 }
346
347 let displayable_path = song_path.display();
348 let Some(song_id) = paths.get(&song_path) else {
349 error!("No song id found for path: {displayable_path}");
350 continue;
351 };
352
353 let embedding = match maybe_embedding {
355 Ok(embedding) => *embedding.inner(),
356 Err(e) => {
357 error!("Error generating embedding for {displayable_path}: {e}");
358 continue;
359 }
360 };
361
362 let features = match maybe_analysis {
364 Ok(analysis) => *analysis.inner(),
365 Err(e) => {
366 error!("Error generating analysis for {displayable_path}: {e}");
367 continue;
368 }
369 };
370
371 if let Err(e) = Analysis::create(
372 db,
373 song_id.clone(),
374 Analysis {
375 id: Analysis::generate_id(),
376 features,
377 embedding,
378 },
379 )
380 .await
381 {
382 error!("Error saving analysis for {displayable_path}: {e}");
383 } else {
384 info!("Analyzed {displayable_path}");
385 }
386 }
387
388 <Result<(), Error>>::Ok(())
389 }
390 .instrument(tracing::info_span!("Adding analyses to database"))
391 .await?;
392
393 tokio::select! {
394 _ = interrupt.wait() => {
396 info!("Analysis interrupted");
397 abort.abort();
398 }
399 result = handle => match result {
401 Ok(Ok(())) => {
402 info!("Analysis complete");
403 info!("Library health: {:?}", health(db).await?);
404 }
405 Ok(Err(e)) => {
406 error!("Error analyzing songs: {e}");
407 }
408 Err(e) => {
409 error!("Error joining task: {e}");
410 }
411 }
412 }
413
414 Ok(())
415}
416
417#[instrument]
425#[inline]
426pub async fn recluster<C: Connection>(
427 db: &Surreal<C>,
428 settings: ReclusterSettings,
429 analysis_settings: &AnalysisSettings,
430 mut interrupt: InterruptReceiver,
431) -> Result<(), Error> {
432 let samples = Analysis::read_all(db).await?;
434
435 if samples.is_empty() {
436 info!("No analyses found, nothing to recluster");
437 return Ok(());
438 }
439
440 let analysis_array = if matches!(analysis_settings.kind, AnalysisKind::Features) {
441 samples
442 .iter()
443 .map(|analysis| analysis.features)
444 .collect::<Vec<_>>()
445 .into()
446 } else {
447 samples
448 .iter()
449 .map(|analysis| analysis.embedding)
450 .collect::<Vec<_>>()
451 .into()
452 };
453
454 let clustering = move || {
456 let Ok(model) = ClusteringHelper::new(
457 analysis_array,
458 settings.max_clusters,
459 KOptimal::GapStatistic {
460 b: settings.gap_statistic_reference_datasets,
461 },
462 settings.algorithm.into(),
463 settings.projection_method.into(),
464 )
465 .inspect_err(|e| error!("There was an error creating the clustering helper: {e}")) else {
466 return None;
467 };
468
469 let Ok(Ok(model)) = model
470 .initialize()
471 .inspect_err(|e| error!("There was an error initializing the clustering helper: {e}"))
472 .map(ClusteringHelper::cluster)
473 else {
474 return None;
475 };
476
477 Some(model)
478 };
479
480 let handle = tokio::task::spawn_blocking(clustering)
482 .instrument(tracing::info_span!("Clustering library"));
483 let abort = handle.inner().abort_handle();
484
485 let model = tokio::select! {
487 _ = interrupt.wait() => {
488 info!("Reclustering interrupted");
489 abort.abort();
490 return Ok(());
491 }
492 result = handle => match result {
493 Ok(Some(model)) => model,
494 Ok(None) => {
495 return Ok(());
496 }
497 Err(e) => {
498 error!("Error joining task: {e}");
499 return Ok(());
500 }
501 }
502 };
503
504 async {
506 for collection in Collection::read_all(db).await? {
509 Collection::delete(db, collection.id.clone()).await?;
510 }
511
512 <Result<(), Error>>::Ok(())
513 }
514 .instrument(tracing::info_span!("Deleting old collections"))
515 .await?;
516
517 async {
519 let analysis_ids = samples.into_iter().map(|a| a.id).collect();
520 let clusters = model.extract_analysis_clusters(analysis_ids);
521
522 for (i, cluster) in clusters.into_iter().filter(|c| !c.is_empty()).enumerate() {
524 let collection = Collection {
525 id: Collection::generate_id(),
526 name: format!("Collection {i}"),
527 runtime: Duration::default(),
528 song_count: Default::default(),
529 };
530 let collection = Collection::create(db, collection)
531 .await?
532 .ok_or(Error::NotCreated)?;
533
534 async {
535 let songs = Analysis::read_songs(db, cluster).await?;
536 let song_ids = songs.into_iter().map(|s| s.id).collect::<Vec<_>>();
537
538 Collection::add_songs(db, collection.id.clone(), song_ids).await?;
539
540 <Result<(), Error>>::Ok(())
541 }
542 .instrument(tracing::info_span!("Adding songs to collection"))
543 .await?;
544 }
545 Ok::<(), Error>(())
546 }
547 .instrument(tracing::info_span!("Creating new collections"))
548 .await?;
549
550 info!("Library recluster complete");
551 info!("Library health: {:?}", health(db).await?);
552
553 Ok(())
554}
555
556#[instrument]
562#[inline]
563pub async fn brief<C: Connection>(db: &Surreal<C>) -> Result<LibraryBrief, Error> {
564 let artists = Artist::read_all_brief(db)
565 .await?
566 .into_iter()
567 .map(Into::into)
568 .collect();
569 let albums = Album::read_all_brief(db)
570 .await?
571 .into_iter()
572 .map(Into::into)
573 .collect();
574 let songs = Song::read_all_brief(db)
575 .await?
576 .into_iter()
577 .map(Into::into)
578 .collect();
579 let playlists = Playlist::read_all_brief(db)
580 .await?
581 .into_iter()
582 .map(Into::into)
583 .collect();
584 let collections = Collection::read_all_brief(db)
585 .await?
586 .into_iter()
587 .map(Into::into)
588 .collect();
589 let dynamic_playlists = DynamicPlaylist::read_all(db)
590 .await?
591 .into_iter()
592 .map(Into::into)
593 .collect();
594 Ok(LibraryBrief {
595 artists,
596 albums,
597 songs,
598 playlists,
599 collections,
600 dynamic_playlists,
601 })
602}
603
604#[instrument]
610#[inline]
611pub async fn full<C: Connection>(db: &Surreal<C>) -> Result<LibraryFull, Error> {
612 Ok(LibraryFull {
613 artists: Artist::read_all(db)
614 .await?
615 .into_iter()
616 .map(Into::into)
617 .collect(),
618 albums: Album::read_all(db)
619 .await?
620 .into_iter()
621 .map(Into::into)
622 .collect(),
623 songs: Song::read_all(db)
624 .await?
625 .into_iter()
626 .map(Into::into)
627 .collect(),
628 playlists: Playlist::read_all(db)
629 .await?
630 .into_iter()
631 .map(Into::into)
632 .collect(),
633 collections: Collection::read_all(db)
634 .await?
635 .into_iter()
636 .map(Into::into)
637 .collect(),
638 dynamic_playlists: DynamicPlaylist::read_all(db)
639 .await?
640 .into_iter()
641 .map(Into::into)
642 .collect(),
643 })
644}
645
646#[instrument]
654#[inline]
655pub async fn health<C: Connection>(db: &Surreal<C>) -> Result<LibraryHealth, Error> {
656 Ok(LibraryHealth {
657 artists: count_artists(db).await?,
658 albums: count_albums(db).await?,
659 songs: count_songs(db).await?,
660 unanalyzed_songs: Some(count_unanalyzed_songs(db).await?),
661 playlists: count_playlists(db).await?,
662 collections: count_collections(db).await?,
663 dynamic_playlists: count_dynamic_playlists(db).await?,
664 orphaned_artists: count_orphaned_artists(db).await?,
665 orphaned_albums: count_orphaned_albums(db).await?,
666 orphaned_playlists: count_orphaned_playlists(db).await?,
667 orphaned_collections: count_orphaned_collections(db).await?,
668 })
669}
670
671#[cfg(test)]
672mod tests {
673 use super::*;
674 use crate::test_utils::init;
675
676 use mecomp_core::config::{ClusterAlgorithm, ProjectionMethod};
677 use mecomp_storage::db::schemas::song::{SongChangeSet, SongMetadata};
678 use mecomp_storage::test_utils::{
679 ARTIST_NAME_SEPARATOR, SongCase, arb_feature_array, arb_song_case, arb_vec,
680 create_song_metadata, create_song_with_overrides, init_test_database,
681 };
682 use one_or_many::OneOrMany;
683 use pretty_assertions::assert_eq;
684 use rstest::rstest;
685
686 #[tokio::test]
687 #[allow(clippy::too_many_lines)]
688 async fn test_rescan() {
689 init();
690 let tempdir = tempfile::tempdir().unwrap();
691 let db = init_test_database().await.unwrap();
692
693 let song_cases = arb_vec(&arb_song_case(), 10..=15)();
695 let metadatas = song_cases
696 .into_iter()
697 .map(|song_case| create_song_metadata(&tempdir, song_case))
698 .collect::<Result<Vec<_>, _>>()
699 .unwrap();
700 let song_with_nonexistent_path = create_song_with_overrides(
703 &db,
704 arb_song_case()(),
705 SongChangeSet {
706 path: Some(tempdir.path().join("nonexistent.mp3")),
707 ..Default::default()
708 },
709 )
710 .await
711 .unwrap();
712 let mut metadata_of_song_with_outdated_metadata =
713 create_song_metadata(&tempdir, arb_song_case()()).unwrap();
714 metadata_of_song_with_outdated_metadata.genre = OneOrMany::None;
715 let song_with_outdated_metadata =
716 Song::try_load_into_db(&db, metadata_of_song_with_outdated_metadata)
717 .await
718 .unwrap();
719 let invalid_song_path = tempdir.path().join("invalid1.mp3");
721 std::fs::write(&invalid_song_path, "this is not a song").unwrap();
722 let invalid_song_path = tempdir.path().join("invalid2.mp3");
724 std::fs::write(&invalid_song_path, "this is not a song").unwrap();
725 let song_with_invalid_metadata = create_song_with_overrides(
726 &db,
727 arb_song_case()(),
728 SongChangeSet {
729 path: Some(tempdir.path().join("invalid2.mp3")),
730 ..Default::default()
731 },
732 )
733 .await
734 .unwrap();
735
736 rescan(
738 &db,
739 &[tempdir.path().to_owned()],
740 &ARTIST_NAME_SEPARATOR.to_string().into(),
741 &OneOrMany::None,
742 Some(ARTIST_NAME_SEPARATOR),
743 MetadataConflictResolution::Overwrite,
744 )
745 .await
746 .unwrap();
747
748 assert_eq!(
751 Song::read(&db, song_with_nonexistent_path.id)
752 .await
753 .unwrap(),
754 None
755 );
756 assert_eq!(
758 Song::read(&db, song_with_invalid_metadata.id)
759 .await
760 .unwrap(),
761 None
762 );
763 assert!(
765 Song::read(&db, song_with_outdated_metadata.id)
766 .await
767 .unwrap()
768 .unwrap()
769 .genre
770 .is_some()
771 );
772 for metadata in metadatas {
775 let song = Song::read_by_path(&db, metadata.path.clone())
777 .await
778 .unwrap();
779 assert!(song.is_some());
780 let song = song.unwrap();
781
782 assert_eq!(SongMetadata::from(&song), metadata);
784
785 let artists = Artist::read_by_names(&db, Vec::from(metadata.artist.clone()))
787 .await
788 .unwrap();
789 assert_eq!(artists.len(), metadata.artist.len());
790 for artist in &artists {
792 assert!(metadata.artist.contains(&artist.name));
793 assert!(
794 Artist::read_songs(&db, artist.id.clone())
795 .await
796 .unwrap()
797 .contains(&song)
798 );
799 }
800 if let Ok(song_artists) = Song::read_artist(&db, song.id.clone()).await {
802 for artist in artists {
803 assert!(song_artists.contains(&artist));
804 }
805 } else {
806 panic!("Error reading song artists");
807 }
808
809 let album = Album::read_by_name_and_album_artist(
811 &db,
812 &metadata.album,
813 metadata.album_artist.clone(),
814 )
815 .await
816 .unwrap();
817 assert!(album.is_some());
818 let album = album.unwrap();
819 assert_eq!(
821 Song::read_album(&db, song.id.clone()).await.unwrap(),
822 Some(album.clone())
823 );
824 assert!(
826 Album::read_songs(&db, album.id.clone())
827 .await
828 .unwrap()
829 .contains(&song)
830 );
831
832 let album_artists =
834 Artist::read_by_names(&db, Vec::from(metadata.album_artist.clone()))
835 .await
836 .unwrap();
837 assert_eq!(album_artists.len(), metadata.album_artist.len());
838 for album_artist in album_artists {
840 assert!(metadata.album_artist.contains(&album_artist.name));
841 assert!(
842 Artist::read_albums(&db, album_artist.id.clone())
843 .await
844 .unwrap()
845 .contains(&album)
846 );
847 }
848 }
849 }
850
851 #[tokio::test]
852 async fn rescan_deletes_preexisting_orphans() {
853 init();
854 let tempdir = tempfile::tempdir().unwrap();
855 let db = init_test_database().await.unwrap();
856
857 let metadata = create_song_metadata(&tempdir, arb_song_case()()).unwrap();
859 let song = Song::try_load_into_db(&db, metadata.clone()).await.unwrap();
860
861 std::fs::remove_file(&song.path).unwrap();
863 Song::delete(&db, (song.id.clone(), false)).await.unwrap();
864
865 rescan(
867 &db,
868 &[tempdir.path().to_owned()],
869 &ARTIST_NAME_SEPARATOR.to_string().into(),
870 &OneOrMany::None,
871 Some(ARTIST_NAME_SEPARATOR),
872 MetadataConflictResolution::Overwrite,
873 )
874 .await
875 .unwrap();
876
877 assert_eq!(Song::read_all(&db).await.unwrap().len(), 0);
879 assert_eq!(Album::read_all(&db).await.unwrap().len(), 0);
880 let artists = Artist::read_all(&db).await.unwrap();
881 for artist in artists {
882 assert_eq!(artist.album_count, 0);
883 assert_eq!(artist.song_count, 0);
884 }
885 assert_eq!(Artist::read_all(&db).await.unwrap().len(), 0);
886 }
887
888 #[tokio::test]
889 async fn rescan_deletes_orphaned_albums_and_artists() {
890 init();
891 let tempdir = tempfile::tempdir().unwrap();
892 let db = init_test_database().await.unwrap();
893
894 let metadata = create_song_metadata(&tempdir, arb_song_case()()).unwrap();
896 let song = Song::try_load_into_db(&db, metadata.clone()).await.unwrap();
897 let artist = Artist::read_by_names(&db, Vec::from(metadata.artist.clone()))
898 .await
899 .unwrap()
900 .pop()
901 .unwrap();
902 let album = Album::read_by_name_and_album_artist(
903 &db,
904 &metadata.album,
905 metadata.album_artist.clone(),
906 )
907 .await
908 .unwrap()
909 .unwrap();
910
911 std::fs::remove_file(&song.path).unwrap();
913
914 rescan(
916 &db,
917 &[tempdir.path().to_owned()],
918 &ARTIST_NAME_SEPARATOR.to_string().into(),
919 &OneOrMany::None,
920 Some(ARTIST_NAME_SEPARATOR),
921 MetadataConflictResolution::Overwrite,
922 )
923 .await
924 .unwrap();
925
926 assert_eq!(Artist::read(&db, artist.id.clone()).await.unwrap(), None);
928 assert_eq!(Album::read(&db, album.id.clone()).await.unwrap(), None);
929 }
930
931 #[tokio::test]
932 async fn test_analyze() {
933 init();
934 let dir = tempfile::tempdir().unwrap();
935 let db = init_test_database().await.unwrap();
936 let interrupt = InterruptReceiver::dummy();
937 let config = mecomp_analysis::embeddings::ModelConfig::default();
938 let settings = AnalysisSettings::default();
939
940 let song_cases = arb_vec(&arb_song_case(), 10..=15)();
942 let song_cases = song_cases.into_iter().enumerate().map(|(i, sc)| SongCase {
943 song: u8::try_from(i).unwrap(),
944 ..sc
945 });
946 let metadatas = song_cases
947 .into_iter()
948 .map(|song_case| create_song_metadata(&dir, song_case))
949 .collect::<Result<Vec<_>, _>>()
950 .unwrap();
951 for metadata in &metadatas {
952 Song::try_load_into_db(&db, metadata.clone()).await.unwrap();
953 }
954
955 assert_eq!(
957 Analysis::read_songs_without_analysis(&db)
958 .await
959 .unwrap()
960 .len(),
961 metadatas.len()
962 );
963
964 analyze(&db, interrupt, true, &settings, config)
966 .await
967 .unwrap();
968
969 assert_eq!(
971 Analysis::read_songs_without_analysis(&db)
972 .await
973 .unwrap()
974 .len(),
975 0
976 );
977 for metadata in &metadatas {
978 let song = Song::read_by_path(&db, metadata.path.clone())
979 .await
980 .unwrap()
981 .unwrap();
982 let analysis = Analysis::read_for_song(&db, song.id.clone()).await.unwrap();
983 assert!(analysis.is_some());
984 }
985
986 for analysis in Analysis::read_all(&db).await.unwrap() {
988 let neighbors = Analysis::nearest_neighbors(&db, analysis.id.clone(), 100)
989 .await
990 .unwrap();
991 assert!(!neighbors.contains(&analysis));
992 assert_eq!(neighbors.len(), metadatas.len() - 1);
993 assert_eq!(
994 neighbors.len(),
995 neighbors
996 .iter()
997 .map(|n| n.id.clone())
998 .collect::<HashSet<_>>()
999 .len()
1000 );
1001 }
1002 }
1003
1004 #[rstest]
1005 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1006 async fn test_recluster(
1007 #[values(ProjectionMethod::TSne, ProjectionMethod::None, ProjectionMethod::Pca)]
1008 projection_method: ProjectionMethod,
1009 ) {
1010 init();
1011 let dir = tempfile::tempdir().unwrap();
1012 let db = init_test_database().await.unwrap();
1013 let settings = ReclusterSettings {
1014 gap_statistic_reference_datasets: 5,
1015 max_clusters: 18,
1016 algorithm: ClusterAlgorithm::GMM,
1017 projection_method,
1018 };
1019 let analysis_settings = AnalysisSettings::default();
1020
1021 let song_cases = arb_vec(&arb_song_case(), 32..=32)();
1023 let song_cases = song_cases.into_iter().enumerate().map(|(i, sc)| SongCase {
1024 song: u8::try_from(i).unwrap(),
1025 ..sc
1026 });
1027 let metadatas = song_cases
1028 .into_iter()
1029 .map(|song_case| create_song_metadata(&dir, song_case))
1030 .collect::<Result<Vec<_>, _>>()
1031 .unwrap();
1032 let mut songs = Vec::with_capacity(metadatas.len());
1033 for metadata in &metadatas {
1034 songs.push(Song::try_load_into_db(&db, metadata.clone()).await.unwrap());
1035 }
1036
1037 for song in &songs {
1039 Analysis::create(
1040 &db,
1041 song.id.clone(),
1042 Analysis {
1043 id: Analysis::generate_id(),
1044 features: arb_feature_array()(),
1045 embedding: arb_feature_array()(),
1046 },
1047 )
1048 .await
1049 .unwrap();
1050 }
1051
1052 recluster(
1054 &db,
1055 settings,
1056 &analysis_settings,
1057 InterruptReceiver::dummy(),
1058 )
1059 .await
1060 .unwrap();
1061
1062 let collections = Collection::read_all(&db).await.unwrap();
1064 assert!(!collections.is_empty());
1065 for collection in collections {
1066 let songs = Collection::read_songs(&db, collection.id.clone())
1067 .await
1068 .unwrap();
1069 assert!(!songs.is_empty());
1070 }
1071 }
1072
1073 #[tokio::test]
1074 async fn test_brief() {
1075 init();
1076 let db = init_test_database().await.unwrap();
1077 let brief = brief(&db).await.unwrap();
1078 assert_eq!(brief.artists, Vec::default());
1079 assert_eq!(brief.albums, Vec::default());
1080 assert_eq!(brief.songs, Vec::default());
1081 assert_eq!(brief.playlists, Vec::default());
1082 assert_eq!(brief.collections, Vec::default());
1083 }
1084
1085 #[tokio::test]
1086 async fn test_full() {
1087 init();
1088 let db = init_test_database().await.unwrap();
1089 let full = full(&db).await.unwrap();
1090 assert_eq!(full.artists.len(), 0);
1091 assert_eq!(full.albums.len(), 0);
1092 assert_eq!(full.songs.len(), 0);
1093 assert_eq!(full.playlists.len(), 0);
1094 assert_eq!(full.collections.len(), 0);
1095 }
1096
1097 #[tokio::test]
1098 async fn test_health() {
1099 init();
1100 let db = init_test_database().await.unwrap();
1101 let health = health(&db).await.unwrap();
1102 assert_eq!(health.artists, 0);
1103 assert_eq!(health.albums, 0);
1104 assert_eq!(health.songs, 0);
1105 assert_eq!(health.unanalyzed_songs, Some(0));
1106 assert_eq!(health.playlists, 0);
1107 assert_eq!(health.collections, 0);
1108 assert_eq!(health.orphaned_artists, 0);
1109 assert_eq!(health.orphaned_albums, 0);
1110 assert_eq!(health.orphaned_playlists, 0);
1111 assert_eq!(health.orphaned_collections, 0);
1112 }
1113}