Skip to main content

mecomp_daemon/services/
library.rs

1use std::{
2    collections::{HashMap, HashSet},
3    num::NonZeroUsize,
4    path::PathBuf,
5    time::Duration,
6};
7
8use log::{debug, error, info, warn};
9use mecomp_analysis::{
10    clustering::{ClusteringHelper, KOptimal},
11    decoder::{Decoder, MecompDecoder},
12    embeddings::ModelConfig,
13};
14use mecomp_core::config::{AnalysisKind, AnalysisSettings, ReclusterSettings};
15use mecomp_prost::{LibraryBrief, LibraryFull, LibraryHealth};
16use one_or_many::OneOrMany;
17use surrealdb::{Connection, Surreal};
18use tracing::{Instrument, instrument};
19use walkdir::WalkDir;
20
21use mecomp_storage::{
22    db::{
23        health::{
24            count_albums, count_artists, count_collections, count_dynamic_playlists,
25            count_orphaned_albums, count_orphaned_artists, count_orphaned_collections,
26            count_orphaned_playlists, count_playlists, count_songs, count_unanalyzed_songs,
27        },
28        schemas::{
29            album::Album,
30            analysis::Analysis,
31            artist::Artist,
32            collection::Collection,
33            dynamic::DynamicPlaylist,
34            playlist::Playlist,
35            song::{Song, SongMetadata},
36        },
37    },
38    errors::Error,
39    util::MetadataConflictResolution,
40};
41
42use crate::termination::InterruptReceiver;
43
44/// Index the library.
45///
46/// # Errors
47///
48/// This function will return an error if there is an error reading from the database.
49/// or if there is an error reading from the file system.
50/// or if there is an error writing to the database.
51#[instrument]
52#[inline]
53pub async fn rescan<C: Connection>(
54    db: &Surreal<C>,
55    paths: &[PathBuf],
56    artist_name_separator: &OneOrMany<String>,
57    protected_artist_names: &OneOrMany<String>,
58    genre_separator: Option<&str>,
59    conflict_resolution_mode: MetadataConflictResolution,
60) -> Result<(), Error> {
61    // for each song, check if the file still exists
62    let mut visited_paths = check_library(
63        db,
64        artist_name_separator,
65        protected_artist_names,
66        genre_separator,
67        conflict_resolution_mode,
68    )
69    .await?;
70
71    // now, index all the songs in the library that haven't been indexed yet
72    index_new_songs(
73        db,
74        paths,
75        &mut visited_paths,
76        artist_name_separator,
77        protected_artist_names,
78        genre_separator,
79    )
80    .await?;
81
82    // find and delete any remaining orphaned albums and artists
83    delete_orphans(db).await?;
84
85    info!("Library rescan complete");
86    info!("Library health: {:?}", health(db).await?);
87
88    Ok(())
89}
90
91/// Check library for missing or updated songs.
92///
93/// # Returns
94///
95/// A set of paths already present in the library, which should be skipped during indexing.
96///
97/// # Errors
98///
99/// This function will return an error if there is an error reading from the database.
100#[instrument]
101async fn check_library<C: Connection>(
102    db: &Surreal<C>,
103    artist_name_separator: &OneOrMany<String>,
104    protected_artist_names: &OneOrMany<String>,
105    genre_separator: Option<&str>,
106    conflict_resolution_mode: MetadataConflictResolution,
107) -> Result<HashSet<PathBuf>, Error> {
108    // use a hashset because hashing is faster than linear search, especially for large libraries
109    // though a trie could be even faster
110    let mut paths_to_skip = HashSet::new();
111
112    let songs = Song::read_all(db).await?;
113    for song in songs {
114        let path = &song.path;
115        if !path.exists() {
116            // remove the song from the library
117            warn!("Song {} no longer exists, deleting", path.display());
118            Song::delete(db, song.id).await?;
119            continue;
120        }
121
122        debug!("loading metadata for {}", path.display());
123        // check if the metadata of the file is the same as the metadata in the database
124        match SongMetadata::load_from_path(
125            path.clone(),
126            artist_name_separator,
127            protected_artist_names,
128            genre_separator,
129        ) {
130            // if we have metadata and the metadata is different from the song's metadata, and ...
131            Ok(metadata) if metadata != SongMetadata::from(&song) => {
132                let log_postfix = if conflict_resolution_mode == MetadataConflictResolution::Skip {
133                    "but conflict resolution mode is \"skip\", so we do nothing"
134                } else {
135                    "resolving conflict"
136                };
137                info!(
138                    "{} has conflicting metadata with index, {log_postfix}",
139                    path.display(),
140                );
141
142                match conflict_resolution_mode {
143                    // ... we are in "overwrite" mode, update the song's metadata
144                    MetadataConflictResolution::Overwrite => {
145                        // if the file has been modified, update the song's metadata
146                        Song::update(db, song.id.clone(), metadata.merge_with_song(&song)).await?;
147                    }
148                    // ... we are in "skip" mode, do nothing
149                    MetadataConflictResolution::Skip => {}
150                }
151            }
152            // if we have an error, delete the song from the library
153            Err(e) => {
154                warn!("Error reading metadata for {}: {e}", path.display());
155                info!(
156                    "assuming the file isn't a song or doesn't exist anymore, removing from library"
157                );
158                Song::delete(db, song.id).await?;
159            }
160            // if the metadata is the same, do nothing
161            _ => {}
162        }
163
164        // now, add the path to the list of paths to skip so that we don't index the song again
165        paths_to_skip.insert(path.clone());
166    }
167
168    Ok(paths_to_skip)
169}
170
171/// Index all new songs in the given paths.
172///
173/// This expects to be passed as input, the output of `check_library`
174///
175/// # Errors
176///
177/// This function will return an error if there is an error reading from the database.
178#[instrument]
179async fn index_new_songs<C: Connection>(
180    db: &Surreal<C>,
181    paths: &[PathBuf],
182    visited_paths: &mut HashSet<PathBuf>,
183    artist_name_separator: &OneOrMany<String>,
184    protected_artist_names: &OneOrMany<String>,
185    genre_separator: Option<&str>,
186) -> Result<(), Error> {
187    debug!("Indexing paths: {paths:?}");
188
189    const BATCH_SIZE: usize = 100;
190    let mut metadata_batch = Vec::with_capacity(BATCH_SIZE);
191    let mut processed_count = 0;
192
193    for path in paths
194        .iter()
195        .filter_map(|p| {
196            p.canonicalize()
197                .inspect_err(|e| warn!("Error canonicalizing path: {e}"))
198                .ok()
199        })
200        .flat_map(|x| WalkDir::new(x).into_iter())
201        .filter_map(|x| x.inspect_err(|e| warn!("Error reading path: {e}")).ok())
202        .filter_map(|x| x.file_type().is_file().then_some(x))
203        .filter(|path| visited_paths.insert(path.path().to_owned()))
204    {
205        // Load metadata from file
206        match SongMetadata::load_from_path(
207            path.path().to_path_buf(),
208            artist_name_separator,
209            protected_artist_names,
210            genre_separator,
211        ) {
212            Ok(metadata) => {
213                metadata_batch.push(metadata);
214
215                // Process batch when full
216                if metadata_batch.len() >= BATCH_SIZE {
217                    match Song::bulk_load_into_db(db, &metadata_batch).await {
218                        Ok(songs) => {
219                            processed_count += songs.len();
220                            info!("Indexed batch: {processed_count} songs total");
221                        }
222                        Err(e) => error!("Error indexing batch: {e}"),
223                    }
224                    metadata_batch.clear();
225                }
226            }
227            Err(e) => warn!("Error reading metadata for {}: {e}", path.path().display()),
228        }
229    }
230
231    // Process remaining songs in partial batch
232    if !metadata_batch.is_empty() {
233        match Song::bulk_load_into_db(db, &metadata_batch).await {
234            Ok(songs) => {
235                processed_count += songs.len();
236                info!("Indexed final batch: {processed_count} songs total");
237            }
238            Err(e) => error!("Error indexing final batch: {e}"),
239        }
240    }
241
242    info!("Finished indexing {processed_count} new songs");
243
244    Ok(())
245}
246
247/// Clean up orphaned items in the library.
248async fn delete_orphans<C: Connection>(db: &Surreal<C>) -> Result<(), Error> {
249    // find and delete any remaining orphaned albums and artists
250    macro_rules! delete_orphans {
251        ($model:ident, $db:expr) => {
252            let orphans = $model::delete_orphaned($db)
253                .instrument(tracing::info_span!(concat!(
254                    "Deleting orphaned ",
255                    stringify!($model)
256                )))
257                .await?;
258            if !orphans.is_empty() {
259                info!("Deleted orphaned {}: {orphans:?}", stringify!($model));
260            }
261        };
262    }
263
264    delete_orphans!(Album, db);
265    delete_orphans!(Artist, db);
266    delete_orphans!(Collection, db);
267    delete_orphans!(Playlist, db);
268
269    Ok(())
270}
271
272/// Analyze the library.
273///
274/// In order, this function will:
275/// - if `overwrite` is true, delete all existing analyses.
276/// - get all the songs that aren't currently analyzed.
277/// - start analyzing those songs in batches.
278/// - update the database with the analyses.
279///
280/// # Errors
281///
282/// This function will return an error if there is an error reading from the database.
283///
284/// # Panics
285///
286/// This function will panic if the thread(s) that analyzes the songs panics.
287#[instrument]
288#[inline]
289pub async fn analyze<C: Connection>(
290    db: &Surreal<C>,
291    mut interrupt: InterruptReceiver,
292    overwrite: bool,
293    settings: &AnalysisSettings,
294    config: ModelConfig,
295) -> Result<(), Error> {
296    if overwrite {
297        // delete all the analyses
298        Analysis::delete_all(db)
299            .instrument(tracing::info_span!("Deleting existing analyses"))
300            .await?;
301    }
302
303    // get all the songs that don't have an analysis
304    let songs_to_analyze: Vec<Song> = Analysis::read_songs_without_analysis(db).await?;
305    // crate a hashmap mapping paths to song ids
306    let paths = songs_to_analyze
307        .into_iter()
308        .map(|song| (song.path, song.id))
309        .collect::<HashMap<_, _>>();
310
311    let keys = paths.keys().cloned().collect::<Vec<_>>();
312
313    // Use a bounded channel to apply backpressure on the producer.
314    // This prevents unbounded memory growth when analysis is faster than database writes.
315    // The buffer size is set to 2x the number of threads to allow some buffering
316    // while still limiting memory usage.
317    const ONE: NonZeroUsize = NonZeroUsize::new(1).unwrap();
318    let channel_buffer_size = 2 * settings
319        .num_threads
320        .unwrap_or_else(|| std::thread::available_parallelism().unwrap_or(ONE))
321        .get();
322    let (tx, rx) = std::sync::mpsc::sync_channel(channel_buffer_size);
323
324    let Ok(decoder) = MecompDecoder::new() else {
325        error!("Error creating decoder");
326        return Ok(());
327    };
328
329    // analyze the songs in batches, this is a blocking operation
330    let num_threads = settings.num_threads;
331    let handle = tokio::task::spawn_blocking(move || {
332        if let Some(num) = num_threads {
333            decoder.process_songs_with_cores(&keys, tx, num, config.clone())
334        } else {
335            decoder.process_songs(&keys, tx, config.clone())
336        }
337    });
338    let abort = handle.abort_handle();
339
340    async {
341        for (song_path, maybe_analysis, maybe_embedding) in rx {
342            if interrupt.is_stopped() {
343                info!("Analysis interrupted");
344                break;
345            }
346
347            let displayable_path = song_path.display();
348            let Some(song_id) = paths.get(&song_path) else {
349                error!("No song id found for path: {displayable_path}");
350                continue;
351            };
352
353            // handle errors in embedding generation
354            let embedding = match maybe_embedding {
355                Ok(embedding) => *embedding.inner(),
356                Err(e) => {
357                    error!("Error generating embedding for {displayable_path}: {e}");
358                    continue;
359                }
360            };
361
362            // handle errors in analysis generation
363            let features = match maybe_analysis {
364                Ok(analysis) => *analysis.inner(),
365                Err(e) => {
366                    error!("Error generating analysis for {displayable_path}: {e}");
367                    continue;
368                }
369            };
370
371            if let Err(e) = Analysis::create(
372                db,
373                song_id.clone(),
374                Analysis {
375                    id: Analysis::generate_id(),
376                    features,
377                    embedding,
378                },
379            )
380            .await
381            {
382                error!("Error saving analysis for {displayable_path}: {e}");
383            } else {
384                info!("Analyzed {displayable_path}");
385            }
386        }
387
388        <Result<(), Error>>::Ok(())
389    }
390    .instrument(tracing::info_span!("Adding analyses to database"))
391    .await?;
392
393    tokio::select! {
394        // wait for the interrupt signal
395        _ = interrupt.wait() => {
396            info!("Analysis interrupted");
397            abort.abort();
398        }
399        // wait for the analysis to finish
400        result = handle => match result {
401            Ok(Ok(())) => {
402                info!("Analysis complete");
403                info!("Library health: {:?}", health(db).await?);
404            }
405            Ok(Err(e)) => {
406                error!("Error analyzing songs: {e}");
407            }
408            Err(e) => {
409                error!("Error joining task: {e}");
410            }
411        }
412    }
413
414    Ok(())
415}
416
417/// Recluster the library.
418///
419/// This function will remove and recompute all the "collections" (clusters) in the library.
420///
421/// # Errors
422///
423/// This function will return an error if there is an error reading from the database.
424#[instrument]
425#[inline]
426pub async fn recluster<C: Connection>(
427    db: &Surreal<C>,
428    settings: ReclusterSettings,
429    analysis_settings: &AnalysisSettings,
430    mut interrupt: InterruptReceiver,
431) -> Result<(), Error> {
432    // collect all the analyses
433    let samples = Analysis::read_all(db).await?;
434
435    if samples.is_empty() {
436        info!("No analyses found, nothing to recluster");
437        return Ok(());
438    }
439
440    let analysis_array = if matches!(analysis_settings.kind, AnalysisKind::Features) {
441        samples
442            .iter()
443            .map(|analysis| analysis.features)
444            .collect::<Vec<_>>()
445            .into()
446    } else {
447        samples
448            .iter()
449            .map(|analysis| analysis.embedding)
450            .collect::<Vec<_>>()
451            .into()
452    };
453
454    // use clustering algorithm to cluster the analyses
455    let clustering = move || {
456        let Ok(model) = ClusteringHelper::new(
457            analysis_array,
458            settings.max_clusters,
459            KOptimal::GapStatistic {
460                b: settings.gap_statistic_reference_datasets,
461            },
462            settings.algorithm.into(),
463            settings.projection_method.into(),
464        )
465        .inspect_err(|e| error!("There was an error creating the clustering helper: {e}")) else {
466            return None;
467        };
468
469        let Ok(Ok(model)) = model
470            .initialize()
471            .inspect_err(|e| error!("There was an error initializing the clustering helper: {e}"))
472            .map(ClusteringHelper::cluster)
473        else {
474            return None;
475        };
476
477        Some(model)
478    };
479
480    // use clustering algorithm to cluster the analyses
481    let handle = tokio::task::spawn_blocking(clustering)
482        .instrument(tracing::info_span!("Clustering library"));
483    let abort = handle.inner().abort_handle();
484
485    // wait for the clustering to finish
486    let model = tokio::select! {
487        _ = interrupt.wait() => {
488            info!("Reclustering interrupted");
489            abort.abort();
490            return Ok(());
491        }
492        result = handle => match result {
493            Ok(Some(model)) => model,
494            Ok(None) => {
495                return Ok(());
496            }
497            Err(e) => {
498                error!("Error joining task: {e}");
499                return Ok(());
500            }
501        }
502    };
503
504    // delete all the collections
505    async {
506        // NOTE: For some reason, if a collection has too many songs, it will fail to delete with "DbError(Db(Tx("Max transaction entries limit exceeded")))"
507        // (this was happening with 892 songs in a collection)
508        for collection in Collection::read_all(db).await? {
509            Collection::delete(db, collection.id.clone()).await?;
510        }
511
512        <Result<(), Error>>::Ok(())
513    }
514    .instrument(tracing::info_span!("Deleting old collections"))
515    .await?;
516
517    // get the clusters from the clustering
518    async {
519        let analysis_ids = samples.into_iter().map(|a| a.id).collect();
520        let clusters = model.extract_analysis_clusters(analysis_ids);
521
522        // create the collections
523        for (i, cluster) in clusters.into_iter().filter(|c| !c.is_empty()).enumerate() {
524            let collection = Collection {
525                id: Collection::generate_id(),
526                name: format!("Collection {i}"),
527                runtime: Duration::default(),
528                song_count: Default::default(),
529            };
530            let collection = Collection::create(db, collection)
531                .await?
532                .ok_or(Error::NotCreated)?;
533
534            async {
535                let songs = Analysis::read_songs(db, cluster).await?;
536                let song_ids = songs.into_iter().map(|s| s.id).collect::<Vec<_>>();
537
538                Collection::add_songs(db, collection.id.clone(), song_ids).await?;
539
540                <Result<(), Error>>::Ok(())
541            }
542            .instrument(tracing::info_span!("Adding songs to collection"))
543            .await?;
544        }
545        Ok::<(), Error>(())
546    }
547    .instrument(tracing::info_span!("Creating new collections"))
548    .await?;
549
550    info!("Library recluster complete");
551    info!("Library health: {:?}", health(db).await?);
552
553    Ok(())
554}
555
556/// Get a brief overview of the library.
557///
558/// # Errors
559///
560/// This function will return an error if there is an error reading from the database.
561#[instrument]
562#[inline]
563pub async fn brief<C: Connection>(db: &Surreal<C>) -> Result<LibraryBrief, Error> {
564    let artists = Artist::read_all_brief(db)
565        .await?
566        .into_iter()
567        .map(Into::into)
568        .collect();
569    let albums = Album::read_all_brief(db)
570        .await?
571        .into_iter()
572        .map(Into::into)
573        .collect();
574    let songs = Song::read_all_brief(db)
575        .await?
576        .into_iter()
577        .map(Into::into)
578        .collect();
579    let playlists = Playlist::read_all_brief(db)
580        .await?
581        .into_iter()
582        .map(Into::into)
583        .collect();
584    let collections = Collection::read_all_brief(db)
585        .await?
586        .into_iter()
587        .map(Into::into)
588        .collect();
589    let dynamic_playlists = DynamicPlaylist::read_all(db)
590        .await?
591        .into_iter()
592        .map(Into::into)
593        .collect();
594    Ok(LibraryBrief {
595        artists,
596        albums,
597        songs,
598        playlists,
599        collections,
600        dynamic_playlists,
601    })
602}
603
604/// Get the full library.
605///
606/// # Errors
607///
608/// This function will return an error if there is an error reading from the database.
609#[instrument]
610#[inline]
611pub async fn full<C: Connection>(db: &Surreal<C>) -> Result<LibraryFull, Error> {
612    Ok(LibraryFull {
613        artists: Artist::read_all(db)
614            .await?
615            .into_iter()
616            .map(Into::into)
617            .collect(),
618        albums: Album::read_all(db)
619            .await?
620            .into_iter()
621            .map(Into::into)
622            .collect(),
623        songs: Song::read_all(db)
624            .await?
625            .into_iter()
626            .map(Into::into)
627            .collect(),
628        playlists: Playlist::read_all(db)
629            .await?
630            .into_iter()
631            .map(Into::into)
632            .collect(),
633        collections: Collection::read_all(db)
634            .await?
635            .into_iter()
636            .map(Into::into)
637            .collect(),
638        dynamic_playlists: DynamicPlaylist::read_all(db)
639            .await?
640            .into_iter()
641            .map(Into::into)
642            .collect(),
643    })
644}
645
646/// Get the health of the library.
647///
648/// This function will return the health of the library, including the number of orphaned items.
649///
650/// # Errors
651///
652/// This function will return an error if there is an error reading from the database.
653#[instrument]
654#[inline]
655pub async fn health<C: Connection>(db: &Surreal<C>) -> Result<LibraryHealth, Error> {
656    Ok(LibraryHealth {
657        artists: count_artists(db).await?,
658        albums: count_albums(db).await?,
659        songs: count_songs(db).await?,
660        unanalyzed_songs: Some(count_unanalyzed_songs(db).await?),
661        playlists: count_playlists(db).await?,
662        collections: count_collections(db).await?,
663        dynamic_playlists: count_dynamic_playlists(db).await?,
664        orphaned_artists: count_orphaned_artists(db).await?,
665        orphaned_albums: count_orphaned_albums(db).await?,
666        orphaned_playlists: count_orphaned_playlists(db).await?,
667        orphaned_collections: count_orphaned_collections(db).await?,
668    })
669}
670
671#[cfg(test)]
672mod tests {
673    use super::*;
674    use crate::test_utils::init;
675
676    use mecomp_core::config::{ClusterAlgorithm, ProjectionMethod};
677    use mecomp_storage::db::schemas::song::{SongChangeSet, SongMetadata};
678    use mecomp_storage::test_utils::{
679        ARTIST_NAME_SEPARATOR, SongCase, arb_feature_array, arb_song_case, arb_vec,
680        create_song_metadata, create_song_with_overrides, init_test_database,
681    };
682    use one_or_many::OneOrMany;
683    use pretty_assertions::assert_eq;
684    use rstest::rstest;
685
686    #[tokio::test]
687    #[allow(clippy::too_many_lines)]
688    async fn test_rescan() {
689        init();
690        let tempdir = tempfile::tempdir().unwrap();
691        let db = init_test_database().await.unwrap();
692
693        // populate the tempdir with songs that aren't in the database
694        let song_cases = arb_vec(&arb_song_case(), 10..=15)();
695        let metadatas = song_cases
696            .into_iter()
697            .map(|song_case| create_song_metadata(&tempdir, song_case))
698            .collect::<Result<Vec<_>, _>>()
699            .unwrap();
700        // also make some songs that are in the database
701        //  - a song that whose file was deleted
702        let song_with_nonexistent_path = create_song_with_overrides(
703            &db,
704            arb_song_case()(),
705            SongChangeSet {
706                path: Some(tempdir.path().join("nonexistent.mp3")),
707                ..Default::default()
708            },
709        )
710        .await
711        .unwrap();
712        let mut metadata_of_song_with_outdated_metadata =
713            create_song_metadata(&tempdir, arb_song_case()()).unwrap();
714        metadata_of_song_with_outdated_metadata.genre = OneOrMany::None;
715        let song_with_outdated_metadata =
716            Song::try_load_into_db(&db, metadata_of_song_with_outdated_metadata)
717                .await
718                .unwrap();
719        // also add a "song" that can't be read
720        let invalid_song_path = tempdir.path().join("invalid1.mp3");
721        std::fs::write(&invalid_song_path, "this is not a song").unwrap();
722        // add another invalid song, this time also put it in the database
723        let invalid_song_path = tempdir.path().join("invalid2.mp3");
724        std::fs::write(&invalid_song_path, "this is not a song").unwrap();
725        let song_with_invalid_metadata = create_song_with_overrides(
726            &db,
727            arb_song_case()(),
728            SongChangeSet {
729                path: Some(tempdir.path().join("invalid2.mp3")),
730                ..Default::default()
731            },
732        )
733        .await
734        .unwrap();
735
736        // rescan the library
737        rescan(
738            &db,
739            &[tempdir.path().to_owned()],
740            &ARTIST_NAME_SEPARATOR.to_string().into(),
741            &OneOrMany::None,
742            Some(ARTIST_NAME_SEPARATOR),
743            MetadataConflictResolution::Overwrite,
744        )
745        .await
746        .unwrap();
747
748        // check that everything was done correctly
749        // - `song_with_nonexistent_path` was deleted
750        assert_eq!(
751            Song::read(&db, song_with_nonexistent_path.id)
752                .await
753                .unwrap(),
754            None
755        );
756        // - `song_with_invalid_metadata` was deleted
757        assert_eq!(
758            Song::read(&db, song_with_invalid_metadata.id)
759                .await
760                .unwrap(),
761            None
762        );
763        // - `song_with_outdated_metadata` was updated
764        assert!(
765            Song::read(&db, song_with_outdated_metadata.id)
766                .await
767                .unwrap()
768                .unwrap()
769                .genre
770                .is_some()
771        );
772        // - all the other songs were added
773        //   and their artists, albums, and album_artists were added and linked correctly
774        for metadata in metadatas {
775            // the song was created
776            let song = Song::read_by_path(&db, metadata.path.clone())
777                .await
778                .unwrap();
779            assert!(song.is_some());
780            let song = song.unwrap();
781
782            // the song's metadata is correct
783            assert_eq!(SongMetadata::from(&song), metadata);
784
785            // the song's artists were created
786            let artists = Artist::read_by_names(&db, Vec::from(metadata.artist.clone()))
787                .await
788                .unwrap();
789            assert_eq!(artists.len(), metadata.artist.len());
790            // the song is linked to the artists
791            for artist in &artists {
792                assert!(metadata.artist.contains(&artist.name));
793                assert!(
794                    Artist::read_songs(&db, artist.id.clone())
795                        .await
796                        .unwrap()
797                        .contains(&song)
798                );
799            }
800            // the artists are linked to the song
801            if let Ok(song_artists) = Song::read_artist(&db, song.id.clone()).await {
802                for artist in artists {
803                    assert!(song_artists.contains(&artist));
804                }
805            } else {
806                panic!("Error reading song artists");
807            }
808
809            // the song's album was created
810            let album = Album::read_by_name_and_album_artist(
811                &db,
812                &metadata.album,
813                metadata.album_artist.clone(),
814            )
815            .await
816            .unwrap();
817            assert!(album.is_some());
818            let album = album.unwrap();
819            // the song is linked to the album
820            assert_eq!(
821                Song::read_album(&db, song.id.clone()).await.unwrap(),
822                Some(album.clone())
823            );
824            // the album is linked to the song
825            assert!(
826                Album::read_songs(&db, album.id.clone())
827                    .await
828                    .unwrap()
829                    .contains(&song)
830            );
831
832            // the album's album artists were created
833            let album_artists =
834                Artist::read_by_names(&db, Vec::from(metadata.album_artist.clone()))
835                    .await
836                    .unwrap();
837            assert_eq!(album_artists.len(), metadata.album_artist.len());
838            // the album is linked to the album artists
839            for album_artist in album_artists {
840                assert!(metadata.album_artist.contains(&album_artist.name));
841                assert!(
842                    Artist::read_albums(&db, album_artist.id.clone())
843                        .await
844                        .unwrap()
845                        .contains(&album)
846                );
847            }
848        }
849    }
850
851    #[tokio::test]
852    async fn rescan_deletes_preexisting_orphans() {
853        init();
854        let tempdir = tempfile::tempdir().unwrap();
855        let db = init_test_database().await.unwrap();
856
857        // create a song with an artist and an album
858        let metadata = create_song_metadata(&tempdir, arb_song_case()()).unwrap();
859        let song = Song::try_load_into_db(&db, metadata.clone()).await.unwrap();
860
861        // delete the song, leaving orphaned artist and album
862        std::fs::remove_file(&song.path).unwrap();
863        Song::delete(&db, (song.id.clone(), false)).await.unwrap();
864
865        // rescan the library
866        rescan(
867            &db,
868            &[tempdir.path().to_owned()],
869            &ARTIST_NAME_SEPARATOR.to_string().into(),
870            &OneOrMany::None,
871            Some(ARTIST_NAME_SEPARATOR),
872            MetadataConflictResolution::Overwrite,
873        )
874        .await
875        .unwrap();
876
877        // check that the album and artist deleted
878        assert_eq!(Song::read_all(&db).await.unwrap().len(), 0);
879        assert_eq!(Album::read_all(&db).await.unwrap().len(), 0);
880        let artists = Artist::read_all(&db).await.unwrap();
881        for artist in artists {
882            assert_eq!(artist.album_count, 0);
883            assert_eq!(artist.song_count, 0);
884        }
885        assert_eq!(Artist::read_all(&db).await.unwrap().len(), 0);
886    }
887
888    #[tokio::test]
889    async fn rescan_deletes_orphaned_albums_and_artists() {
890        init();
891        let tempdir = tempfile::tempdir().unwrap();
892        let db = init_test_database().await.unwrap();
893
894        // create a song with an artist and an album
895        let metadata = create_song_metadata(&tempdir, arb_song_case()()).unwrap();
896        let song = Song::try_load_into_db(&db, metadata.clone()).await.unwrap();
897        let artist = Artist::read_by_names(&db, Vec::from(metadata.artist.clone()))
898            .await
899            .unwrap()
900            .pop()
901            .unwrap();
902        let album = Album::read_by_name_and_album_artist(
903            &db,
904            &metadata.album,
905            metadata.album_artist.clone(),
906        )
907        .await
908        .unwrap()
909        .unwrap();
910
911        // delete the song, leaving orphaned artist and album
912        std::fs::remove_file(&song.path).unwrap();
913
914        // rescan the library
915        rescan(
916            &db,
917            &[tempdir.path().to_owned()],
918            &ARTIST_NAME_SEPARATOR.to_string().into(),
919            &OneOrMany::None,
920            Some(ARTIST_NAME_SEPARATOR),
921            MetadataConflictResolution::Overwrite,
922        )
923        .await
924        .unwrap();
925
926        // check that the artist and album were deleted
927        assert_eq!(Artist::read(&db, artist.id.clone()).await.unwrap(), None);
928        assert_eq!(Album::read(&db, album.id.clone()).await.unwrap(), None);
929    }
930
931    #[tokio::test]
932    async fn test_analyze() {
933        init();
934        let dir = tempfile::tempdir().unwrap();
935        let db = init_test_database().await.unwrap();
936        let interrupt = InterruptReceiver::dummy();
937        let config = mecomp_analysis::embeddings::ModelConfig::default();
938        let settings = AnalysisSettings::default();
939
940        // load some songs into the database
941        let song_cases = arb_vec(&arb_song_case(), 10..=15)();
942        let song_cases = song_cases.into_iter().enumerate().map(|(i, sc)| SongCase {
943            song: u8::try_from(i).unwrap(),
944            ..sc
945        });
946        let metadatas = song_cases
947            .into_iter()
948            .map(|song_case| create_song_metadata(&dir, song_case))
949            .collect::<Result<Vec<_>, _>>()
950            .unwrap();
951        for metadata in &metadatas {
952            Song::try_load_into_db(&db, metadata.clone()).await.unwrap();
953        }
954
955        // check that there are no analyses before.
956        assert_eq!(
957            Analysis::read_songs_without_analysis(&db)
958                .await
959                .unwrap()
960                .len(),
961            metadatas.len()
962        );
963
964        // analyze the library
965        analyze(&db, interrupt, true, &settings, config)
966            .await
967            .unwrap();
968
969        // check that all the songs have analyses
970        assert_eq!(
971            Analysis::read_songs_without_analysis(&db)
972                .await
973                .unwrap()
974                .len(),
975            0
976        );
977        for metadata in &metadatas {
978            let song = Song::read_by_path(&db, metadata.path.clone())
979                .await
980                .unwrap()
981                .unwrap();
982            let analysis = Analysis::read_for_song(&db, song.id.clone()).await.unwrap();
983            assert!(analysis.is_some());
984        }
985
986        // check that if we ask for the nearest neighbors of one of these songs, we get all the other songs
987        for analysis in Analysis::read_all(&db).await.unwrap() {
988            let neighbors = Analysis::nearest_neighbors(&db, analysis.id.clone(), 100)
989                .await
990                .unwrap();
991            assert!(!neighbors.contains(&analysis));
992            assert_eq!(neighbors.len(), metadatas.len() - 1);
993            assert_eq!(
994                neighbors.len(),
995                neighbors
996                    .iter()
997                    .map(|n| n.id.clone())
998                    .collect::<HashSet<_>>()
999                    .len()
1000            );
1001        }
1002    }
1003
1004    #[rstest]
1005    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1006    async fn test_recluster(
1007        #[values(ProjectionMethod::TSne, ProjectionMethod::None, ProjectionMethod::Pca)]
1008        projection_method: ProjectionMethod,
1009    ) {
1010        init();
1011        let dir = tempfile::tempdir().unwrap();
1012        let db = init_test_database().await.unwrap();
1013        let settings = ReclusterSettings {
1014            gap_statistic_reference_datasets: 5,
1015            max_clusters: 18,
1016            algorithm: ClusterAlgorithm::GMM,
1017            projection_method,
1018        };
1019        let analysis_settings = AnalysisSettings::default();
1020
1021        // load some songs into the database
1022        let song_cases = arb_vec(&arb_song_case(), 32..=32)();
1023        let song_cases = song_cases.into_iter().enumerate().map(|(i, sc)| SongCase {
1024            song: u8::try_from(i).unwrap(),
1025            ..sc
1026        });
1027        let metadatas = song_cases
1028            .into_iter()
1029            .map(|song_case| create_song_metadata(&dir, song_case))
1030            .collect::<Result<Vec<_>, _>>()
1031            .unwrap();
1032        let mut songs = Vec::with_capacity(metadatas.len());
1033        for metadata in &metadatas {
1034            songs.push(Song::try_load_into_db(&db, metadata.clone()).await.unwrap());
1035        }
1036
1037        // load some dummy analyses into the database
1038        for song in &songs {
1039            Analysis::create(
1040                &db,
1041                song.id.clone(),
1042                Analysis {
1043                    id: Analysis::generate_id(),
1044                    features: arb_feature_array()(),
1045                    embedding: arb_feature_array()(),
1046                },
1047            )
1048            .await
1049            .unwrap();
1050        }
1051
1052        // recluster the library
1053        recluster(
1054            &db,
1055            settings,
1056            &analysis_settings,
1057            InterruptReceiver::dummy(),
1058        )
1059        .await
1060        .unwrap();
1061
1062        // check that there are collections
1063        let collections = Collection::read_all(&db).await.unwrap();
1064        assert!(!collections.is_empty());
1065        for collection in collections {
1066            let songs = Collection::read_songs(&db, collection.id.clone())
1067                .await
1068                .unwrap();
1069            assert!(!songs.is_empty());
1070        }
1071    }
1072
1073    #[tokio::test]
1074    async fn test_brief() {
1075        init();
1076        let db = init_test_database().await.unwrap();
1077        let brief = brief(&db).await.unwrap();
1078        assert_eq!(brief.artists, Vec::default());
1079        assert_eq!(brief.albums, Vec::default());
1080        assert_eq!(brief.songs, Vec::default());
1081        assert_eq!(brief.playlists, Vec::default());
1082        assert_eq!(brief.collections, Vec::default());
1083    }
1084
1085    #[tokio::test]
1086    async fn test_full() {
1087        init();
1088        let db = init_test_database().await.unwrap();
1089        let full = full(&db).await.unwrap();
1090        assert_eq!(full.artists.len(), 0);
1091        assert_eq!(full.albums.len(), 0);
1092        assert_eq!(full.songs.len(), 0);
1093        assert_eq!(full.playlists.len(), 0);
1094        assert_eq!(full.collections.len(), 0);
1095    }
1096
1097    #[tokio::test]
1098    async fn test_health() {
1099        init();
1100        let db = init_test_database().await.unwrap();
1101        let health = health(&db).await.unwrap();
1102        assert_eq!(health.artists, 0);
1103        assert_eq!(health.albums, 0);
1104        assert_eq!(health.songs, 0);
1105        assert_eq!(health.unanalyzed_songs, Some(0));
1106        assert_eq!(health.playlists, 0);
1107        assert_eq!(health.collections, 0);
1108        assert_eq!(health.orphaned_artists, 0);
1109        assert_eq!(health.orphaned_albums, 0);
1110        assert_eq!(health.orphaned_playlists, 0);
1111        assert_eq!(health.orphaned_collections, 0);
1112    }
1113}