mecomp_daemon/
lib.rs

1#![deny(clippy::missing_inline_in_public_items)]
2
3//----------------------------------------------------------------------------------------- std lib
4use std::{
5    net::{IpAddr, Ipv4Addr},
6    path::PathBuf,
7    sync::Arc,
8};
9//--------------------------------------------------------------------------------- other libraries
10use futures::{future, prelude::*};
11use log::{error, info};
12use persistence::QueueState;
13use surrealdb::{Surreal, engine::local::Db};
14use tarpc::{
15    self,
16    serde_transport::tcp,
17    server::{BaseChannel, Channel as _, incoming::Incoming as _},
18    tokio_serde::formats::Json,
19};
20use tokio::runtime::Handle;
21use tracing::Instrument;
22//-------------------------------------------------------------------------------- MECOMP libraries
23use mecomp_core::{
24    audio::{AudioKernelSender, commands::AudioCommand},
25    config::Settings,
26    is_server_running,
27    logger::{init_logger, init_tracing},
28    rpc::{MusicPlayer as _, MusicPlayerClient},
29    udp::{Message, Sender, StateChange},
30};
31use mecomp_storage::db::{init_database, set_database_path};
32
33async fn spawn<F>(fut: F)
34where
35    F: Future<Output = ()> + Send + 'static,
36{
37    if size_of::<F>() > 1024 {
38        // if the future is too big, box it before spawning
39        let fut = Box::pin(fut);
40        tokio::spawn(fut);
41    } else {
42        // if the future is small enough, spawn it directly
43        tokio::spawn(fut);
44    }
45}
46
47pub mod controller;
48#[cfg(feature = "dynamic_updates")]
49pub mod dynamic_updates;
50pub mod persistence;
51pub mod services;
52pub mod termination;
53#[cfg(test)]
54pub use mecomp_core::test_utils;
55
56use crate::{controller::MusicPlayerServer, termination::InterruptReceiver};
57
58/// The number of connections per IP address.
59const CHANNELS_PER_IP: u32 = 10;
60/// The maximum number of concurrent requests.
61pub const MAX_CONCURRENT_REQUESTS: usize = 4;
62
63/// Event Publisher guard
64///
65/// This is a newtype for the event publisher that ensures it is stopped when the guard is dropped.
66struct EventPublisher {
67    dispatcher: Arc<Sender<Message>>,
68    event_tx: std::sync::mpsc::Sender<StateChange>,
69    handle: tokio::task::JoinHandle<()>,
70}
71
72impl EventPublisher {
73    /// Start the event publisher
74    pub async fn new() -> Self {
75        let (event_tx, event_rx) = std::sync::mpsc::channel();
76        let event_publisher = Arc::new(Sender::new().await.unwrap());
77        let event_publisher_clone = event_publisher.clone();
78
79        let handle = tokio::task::spawn_blocking(move || {
80            while let Ok(event) = event_rx.recv() {
81                // re-enter the async context to send the event over UDP
82                Handle::current().block_on(async {
83                    if let Err(e) = event_publisher_clone
84                        .send(Message::StateChange(event))
85                        .await
86                    {
87                        error!("Failed to send event over UDP: {e}");
88                    }
89                });
90            }
91        })
92        .instrument(tracing::info_span!("event_publisher"));
93
94        Self {
95            dispatcher: event_publisher,
96            event_tx,
97            handle: handle.into_inner(),
98        }
99    }
100}
101
102impl Drop for EventPublisher {
103    fn drop(&mut self) {
104        // Stop the event publisher thread
105        self.handle.abort();
106    }
107}
108
109// TODO: at some point, we should probably add a panic handler to the daemon to ensure graceful shutdown.
110
111/// Run the daemon
112///
113/// also initializes the logger, database, and other necessary components.
114///
115/// # Arguments
116///
117/// * `settings` - The settings to use.
118/// * `db_dir` - The directory where the database is stored.
119///   If the directory does not exist, it will be created.
120/// * `log_file_path` - The path to the file where logs will be written.
121/// * `state_file_path` - The path to the file where the queue state restored from / saved to.
122///
123/// # Errors
124///
125/// If the daemon cannot be started, an error is returned.
126///
127/// # Panics
128///
129/// Panics if the peer address of the underlying TCP transport cannot be determined.
130#[inline]
131#[allow(clippy::redundant_pub_crate)]
132pub async fn start_daemon(
133    settings: Settings,
134    db_dir: PathBuf,
135    log_file_path: Option<PathBuf>,
136    state_file_path: Option<PathBuf>,
137) -> anyhow::Result<()> {
138    // Throw the given settings into an Arc so we can share settings across threads.
139    let settings = Arc::new(settings);
140
141    // check if a server is already running
142    if is_server_running(settings.daemon.rpc_port) {
143        anyhow::bail!(
144            "A server is already running on port {}",
145            settings.daemon.rpc_port
146        );
147    }
148
149    // Initialize the logger, database, and tracing.
150    init_logger(settings.daemon.log_level, log_file_path);
151    set_database_path(db_dir)?;
152    let db = Arc::new(init_database().await?);
153    tracing::subscriber::set_global_default(init_tracing())?;
154
155    // initialize the termination handler
156    let (terminator, interrupt_rx) = termination::create_termination();
157
158    // Start the music library watcher.
159    #[cfg(feature = "dynamic_updates")]
160    let guard = dynamic_updates::init_music_library_watcher(
161        db.clone(),
162        &settings.daemon.library_paths,
163        settings.daemon.artist_separator.clone(),
164        settings.daemon.protected_artist_names.clone(),
165        settings.daemon.genre_separator.clone(),
166        interrupt_rx.resubscribe(),
167    )?;
168
169    // initialize the event publisher
170    let event_publisher_guard = EventPublisher::new().await;
171
172    // Start the audio kernel.
173    let audio_kernel = AudioKernelSender::start(event_publisher_guard.event_tx.clone());
174
175    // optionally restore the queue state
176    if let Some(state_path) = &state_file_path {
177        info!("Restoring queue state from {}", state_path.display());
178        if let Err(e) =
179            QueueState::load_from_file(state_path).map(|state| state.restore_to(&audio_kernel))
180        {
181            error!("Failed to restore queue state: {e}");
182        }
183    }
184
185    // Initialize the server.
186    let server = MusicPlayerServer::new(
187        db.clone(),
188        settings.clone(),
189        audio_kernel.clone(),
190        event_publisher_guard.dispatcher.clone(),
191        terminator.clone(),
192        interrupt_rx.resubscribe(),
193    );
194
195    // Start the daemon server.
196    if let Err(e) = run_daemon(server, settings, interrupt_rx.resubscribe()).await {
197        error!("Error running daemon: {e}");
198    }
199
200    #[cfg(feature = "dynamic_updates")]
201    guard.stop();
202
203    // send a shutdown event to all clients (ignore errors)
204    let _ = event_publisher_guard
205        .dispatcher
206        .send(Message::Event(mecomp_core::udp::Event::DaemonShutdown))
207        .await;
208
209    if let Some(state_path) = &state_file_path {
210        info!("Persisting queue state to {}", state_path.display());
211        if let Err(e) = QueueState::retrieve(&audio_kernel)
212            .await
213            .and_then(|state| state.save_to_file(state_path))
214        {
215            error!("Failed to persist queue state: {e}");
216        }
217    }
218
219    log::info!("Cleanup complete, exiting...");
220
221    Ok(())
222}
223
224/// Run the daemon with the given settings, database, and state file path.
225/// Does not handle setup or teardown, just runs the server.
226async fn run_daemon(
227    server: MusicPlayerServer,
228    settings: Arc<Settings>,
229    mut interrupt_rx: InterruptReceiver,
230) -> anyhow::Result<()> {
231    // Start the RPC server.
232    let server_addr = (IpAddr::V4(Ipv4Addr::LOCALHOST), settings.daemon.rpc_port);
233
234    let mut listener = match tcp::listen(&server_addr, Json::default).await {
235        Ok(listener) => listener,
236        Err(e) => {
237            error!("Failed to start server: {e}");
238            return Err(anyhow::anyhow!("Failed to start server: {e}"));
239        }
240    };
241    info!("Listening on {}", listener.local_addr());
242    listener.config_mut().max_frame_length(usize::MAX);
243    let server_handle = listener
244        // Ignore accept errors.
245        .filter_map(|r| future::ready(r.ok()))
246        .map(BaseChannel::with_defaults)
247        // Limit channels to 10 per IP.
248        .max_channels_per_key(CHANNELS_PER_IP, |t| t.transport().peer_addr().unwrap().ip())
249        // Set up the server's handling of incoming connections.
250        // serve is generated by the service attribute.
251        // It takes as input any type implementing the generated MusicPlayer trait.
252        .map(|channel| channel.execute(server.clone().serve()).for_each(spawn))
253        // Max 4 channels.
254        // this means that we will only process 4 requests at a time
255        // NOTE: if we have issues with concurrency (e.g. deadlocks or data-races),
256        //       and have too much of a skill issue to fix it, we can set this number to 1.
257        .buffer_unordered(MAX_CONCURRENT_REQUESTS)
258        .for_each(async |()| {})
259        // make it fused so we can stop it later
260        .fuse();
261
262    // run the server until it is terminated
263    tokio::select! {
264        () = server_handle => {
265            error!("Server stopped unexpectedly");
266        },
267        // Wait for the server to be stopped.
268        // This will be triggered by the signal handler.
269        reason = interrupt_rx.wait() => {
270            match reason {
271                Ok(termination::Interrupted::UserInt) => info!("Stopping server per user request"),
272                Ok(termination::Interrupted::OsSigInt) => info!("Stopping server because of an os sig int"),
273                Ok(termination::Interrupted::OsSigTerm) => info!("Stopping server because of an os sig term"),
274                Ok(termination::Interrupted::OsSigQuit) => info!("Stopping server because of an os sig quit"),
275                Err(e) => error!("Stopping server because of an unexpected error: {e}"),
276            }
277        }
278    }
279
280    Ok(())
281}
282
283/// Initialize a test client, sends and receives messages over a channel / pipe.
284/// This is useful for testing the server without needing to start it.
285///
286/// # Errors
287///
288/// Errors if the event publisher cannot be created.
289#[inline]
290pub async fn init_test_client_server(
291    db: Arc<Surreal<Db>>,
292    settings: Arc<Settings>,
293    audio_kernel: Arc<AudioKernelSender>,
294) -> anyhow::Result<MusicPlayerClient> {
295    let (client_transport, server_transport) = tarpc::transport::channel::unbounded();
296
297    let event_publisher = Arc::new(Sender::new().await?);
298    // initialize the termination handler
299    let (terminator, mut interrupt_rx) = termination::create_termination();
300    #[allow(clippy::redundant_pub_crate)]
301    tokio::spawn(async move {
302        let server = MusicPlayerServer::new(
303            db,
304            settings,
305            audio_kernel.clone(),
306            event_publisher.clone(),
307            terminator,
308            interrupt_rx.resubscribe(),
309        );
310        tokio::select! {
311            () = tarpc::server::BaseChannel::with_defaults(server_transport)
312                .execute(server.serve())
313                // Handle all requests concurrently.
314                .for_each(async |response| {
315                    tokio::spawn(response);
316                }) => {},
317            // Wait for the server to be stopped.
318            _ = interrupt_rx.wait() => {
319                // Stop the server.
320                info!("Stopping server...");
321                audio_kernel.send(AudioCommand::Exit);
322                let _ = event_publisher.send(Message::Event(mecomp_core::udp::Event::DaemonShutdown)).await;
323                info!("Server stopped");
324            }
325        }
326    });
327
328    // MusicPlayerClient is generated by the #[tarpc::service] attribute. It has a constructor `new`
329    // that takes a config and any Transport as input.
330    Ok(MusicPlayerClient::new(tarpc::client::Config::default(), client_transport).spawn())
331}
332
333#[cfg(test)]
334mod test_client_tests {
335    //! Tests for:
336    //! - the `init_test_client_server` function
337    //! - daemon endpoints that aren't covered in other tests
338
339    use std::io::{Read, Write};
340
341    use super::*;
342    use anyhow::Result;
343    use mecomp_core::{
344        errors::{BackupError, SerializableLibraryError},
345        state::library::LibraryFull,
346    };
347    use mecomp_storage::{
348        db::schemas::{
349            collection::Collection,
350            dynamic::{DynamicPlaylist, DynamicPlaylistChangeSet, query::Query},
351            playlist::Playlist,
352            song::SongChangeSet,
353        },
354        test_utils::{SongCase, create_song_with_overrides, init_test_database},
355    };
356
357    use pretty_assertions::{assert_eq, assert_str_eq};
358    use rstest::{fixture, rstest};
359
360    #[fixture]
361    async fn db() -> Arc<Surreal<Db>> {
362        let db = Arc::new(init_test_database().await.unwrap());
363
364        // create a test song, add it to a playlist and collection
365
366        let song_case = SongCase::new(0, vec![0], vec![0], 0, 0);
367
368        // Call the create_song function
369        let song = create_song_with_overrides(
370            &db,
371            song_case,
372            SongChangeSet {
373                // need to specify overrides so that items are created in the db
374                artist: Some("Artist 0".to_string().into()),
375                album_artist: Some("Artist 0".to_string().into()),
376                album: Some("Album 0".into()),
377                path: Some("/path/to/song.mp3".into()),
378                ..Default::default()
379            },
380        )
381        .await
382        .unwrap();
383
384        // create a playlist with the song
385        let playlist = Playlist {
386            id: Playlist::generate_id(),
387            name: "Playlist 0".into(),
388            runtime: song.runtime,
389            song_count: 1,
390        };
391
392        let result = Playlist::create(&db, playlist).await.unwrap().unwrap();
393
394        Playlist::add_songs(&db, result.id, vec![song.id.clone()])
395            .await
396            .unwrap();
397
398        // create a collection with the song
399        let collection = Collection {
400            id: Collection::generate_id(),
401            name: "Collection 0".into(),
402            runtime: song.runtime,
403            song_count: 1,
404        };
405
406        let result = Collection::create(&db, collection).await.unwrap().unwrap();
407
408        Collection::add_songs(&db, result.id, vec![song.id])
409            .await
410            .unwrap();
411
412        return db;
413    }
414
415    #[fixture]
416    async fn client(#[future] db: Arc<Surreal<Db>>) -> MusicPlayerClient {
417        let settings = Arc::new(Settings::default());
418        let (tx, _) = std::sync::mpsc::channel();
419        let audio_kernel = AudioKernelSender::start(tx);
420
421        init_test_client_server(db.await, settings, audio_kernel)
422            .await
423            .unwrap()
424    }
425
426    #[tokio::test]
427    async fn test_init_test_client_server() {
428        let db = Arc::new(init_test_database().await.unwrap());
429        let settings = Arc::new(Settings::default());
430        let (tx, _) = std::sync::mpsc::channel();
431        let audio_kernel = AudioKernelSender::start(tx);
432
433        let client = init_test_client_server(db, settings, audio_kernel)
434            .await
435            .unwrap();
436
437        let ctx = tarpc::context::current();
438        let response = client.ping(ctx).await.unwrap();
439
440        assert_eq!(response, "pong");
441
442        // ensure that the client is shutdown properly
443        drop(client);
444    }
445
446    #[rstest]
447    #[tokio::test]
448    async fn test_library_song_get_artist(#[future] client: MusicPlayerClient) -> Result<()> {
449        let client = client.await;
450
451        let ctx = tarpc::context::current();
452        let library_full: LibraryFull = client.library_full(ctx).await??;
453
454        let ctx = tarpc::context::current();
455        let response = client
456            .library_song_get_artist(ctx, library_full.songs.first().unwrap().id.clone().into())
457            .await?;
458
459        assert_eq!(response, library_full.artists.into_vec().into());
460
461        Ok(())
462    }
463
464    #[rstest]
465    #[tokio::test]
466    async fn test_library_song_get_album(#[future] client: MusicPlayerClient) -> Result<()> {
467        let client = client.await;
468
469        let ctx = tarpc::context::current();
470        let library_full: LibraryFull = client.library_full(ctx).await??;
471
472        let ctx = tarpc::context::current();
473        let response = client
474            .library_song_get_album(ctx, library_full.songs.first().unwrap().id.clone().into())
475            .await?
476            .unwrap();
477
478        assert_eq!(response, library_full.albums.first().unwrap().clone());
479
480        Ok(())
481    }
482
483    #[rstest]
484    #[tokio::test]
485    async fn test_library_song_get_playlists(#[future] client: MusicPlayerClient) -> Result<()> {
486        let client = client.await;
487
488        let ctx = tarpc::context::current();
489        let library_full: LibraryFull = client.library_full(ctx).await??;
490
491        let ctx = tarpc::context::current();
492        let response = client
493            .library_song_get_playlists(ctx, library_full.songs.first().unwrap().id.clone().into())
494            .await?;
495
496        assert_eq!(response, library_full.playlists.into_vec().into());
497
498        Ok(())
499    }
500
501    #[rstest]
502    #[tokio::test]
503    async fn test_library_album_get_artist(#[future] client: MusicPlayerClient) -> Result<()> {
504        let client = client.await;
505
506        let ctx = tarpc::context::current();
507        let library_full: LibraryFull = client.library_full(ctx).await??;
508
509        let ctx = tarpc::context::current();
510        let response = client
511            .library_album_get_artist(ctx, library_full.albums.first().unwrap().id.clone().into())
512            .await?;
513
514        assert_eq!(response, library_full.artists.into_vec().into());
515
516        Ok(())
517    }
518
519    #[rstest]
520    #[tokio::test]
521    async fn test_library_album_get_songs(#[future] client: MusicPlayerClient) -> Result<()> {
522        let client = client.await;
523
524        let ctx = tarpc::context::current();
525        let library_full: LibraryFull = client.library_full(ctx).await??;
526
527        let ctx = tarpc::context::current();
528        let response = client
529            .library_album_get_songs(ctx, library_full.albums.first().unwrap().id.clone().into())
530            .await?
531            .unwrap();
532
533        assert_eq!(response, library_full.songs);
534
535        Ok(())
536    }
537
538    #[rstest]
539    #[tokio::test]
540    async fn test_library_artist_get_songs(#[future] client: MusicPlayerClient) -> Result<()> {
541        let client = client.await;
542
543        let ctx = tarpc::context::current();
544        let library_full: LibraryFull = client.library_full(ctx).await??;
545
546        let ctx = tarpc::context::current();
547        let response = client
548            .library_artist_get_songs(ctx, library_full.artists.first().unwrap().id.clone().into())
549            .await?
550            .unwrap();
551
552        assert_eq!(response, library_full.songs);
553
554        Ok(())
555    }
556
557    #[rstest]
558    #[tokio::test]
559    async fn test_library_artist_get_albums(#[future] client: MusicPlayerClient) -> Result<()> {
560        let client = client.await;
561
562        let ctx = tarpc::context::current();
563        let library_full: LibraryFull = client.library_full(ctx).await??;
564
565        let ctx = tarpc::context::current();
566        let response = client
567            .library_artist_get_albums(ctx, library_full.artists.first().unwrap().id.clone().into())
568            .await?
569            .unwrap();
570
571        assert_eq!(response, library_full.albums);
572
573        Ok(())
574    }
575
576    #[rstest]
577    #[tokio::test]
578    async fn test_playback_volume_toggle_mute(#[future] client: MusicPlayerClient) -> Result<()> {
579        let client = client.await;
580
581        let ctx = tarpc::context::current();
582
583        client.playback_volume_toggle_mute(ctx).await?;
584        Ok(())
585    }
586
587    #[rstest]
588    #[tokio::test]
589    async fn test_playback_stop(#[future] client: MusicPlayerClient) -> Result<()> {
590        let client = client.await;
591
592        let ctx = tarpc::context::current();
593
594        client.playback_stop(ctx).await?;
595        Ok(())
596    }
597
598    #[rstest]
599    #[tokio::test]
600    async fn test_queue_add_list(#[future] client: MusicPlayerClient) -> Result<()> {
601        let client = client.await;
602
603        let ctx = tarpc::context::current();
604        let library_full: LibraryFull = client.library_full(ctx).await??;
605
606        let ctx = tarpc::context::current();
607        let response = client
608            .queue_add_list(
609                ctx,
610                vec![library_full.songs.first().unwrap().id.clone().into()],
611            )
612            .await?;
613
614        assert_eq!(response, Ok(()));
615
616        Ok(())
617    }
618
619    #[rstest]
620    #[case::get(String::from("Playlist 0"))]
621    #[case::create(String::from("Playlist 1"))]
622    #[tokio::test]
623    async fn test_playlist_get_or_create(
624        #[future] client: MusicPlayerClient,
625        #[case] name: String,
626    ) -> Result<()> {
627        let client = client.await;
628
629        let ctx = tarpc::context::current();
630
631        // get or create the playlist
632        let playlist_id = client
633            .playlist_get_or_create(ctx, name.clone())
634            .await?
635            .unwrap();
636
637        // now get that playlist
638        let ctx = tarpc::context::current();
639        let playlist = client.playlist_get(ctx, playlist_id).await?.unwrap();
640
641        assert_eq!(playlist.name, name);
642
643        Ok(())
644    }
645
646    #[rstest]
647    #[tokio::test]
648    async fn test_playlist_clone(#[future] client: MusicPlayerClient) -> Result<()> {
649        let client = client.await;
650
651        let ctx = tarpc::context::current();
652        let library_full: LibraryFull = client.library_full(ctx).await??;
653
654        // clone the only playlist in the db
655        let ctx = tarpc::context::current();
656        let playlist_id = client
657            .playlist_clone(
658                ctx,
659                library_full.playlists.first().unwrap().id.clone().into(),
660            )
661            .await?
662            .unwrap();
663
664        // now get that playlist
665        let ctx = tarpc::context::current();
666        let playlist = client.playlist_get(ctx, playlist_id).await?.unwrap();
667
668        assert_eq!(playlist.name, "Playlist 0 (copy)");
669
670        Ok(())
671    }
672
673    #[rstest]
674    #[tokio::test]
675    async fn test_playlist_get_songs(#[future] client: MusicPlayerClient) -> Result<()> {
676        let client = client.await;
677
678        let ctx = tarpc::context::current();
679        let library_full: LibraryFull = client.library_full(ctx).await??;
680
681        // clone the only playlist in the db
682        let response = client
683            .playlist_get_songs(
684                ctx,
685                library_full.playlists.first().unwrap().id.clone().into(),
686            )
687            .await?
688            .unwrap();
689
690        assert_eq!(response, library_full.songs);
691
692        Ok(())
693    }
694
695    #[rstest]
696    #[tokio::test]
697    async fn test_playlist_rename(#[future] client: MusicPlayerClient) -> Result<()> {
698        let client = client.await;
699
700        let ctx = tarpc::context::current();
701        let library_full: LibraryFull = client.library_full(ctx).await??;
702
703        let target = library_full.playlists.first().unwrap();
704
705        let ctx = tarpc::context::current();
706        let response = client
707            .playlist_rename(ctx, target.id.clone().into(), "New Name".into())
708            .await?;
709
710        let expected = Playlist {
711            name: "New Name".into(),
712            ..target.clone()
713        };
714
715        assert_eq!(response, Ok(expected.clone()));
716
717        let ctx = tarpc::context::current();
718        let response = client
719            .playlist_get(ctx, target.id.clone().into())
720            .await?
721            .unwrap();
722
723        assert_eq!(response, expected);
724        Ok(())
725    }
726
727    #[rstest]
728    #[tokio::test]
729    async fn test_collection_get_songs(#[future] client: MusicPlayerClient) -> Result<()> {
730        let client = client.await;
731
732        let ctx = tarpc::context::current();
733        let library_full: LibraryFull = client.library_full(ctx).await??;
734
735        // clone the only playlist in the db
736        let response = client
737            .collection_get_songs(
738                ctx,
739                library_full.collections.first().unwrap().id.clone().into(),
740            )
741            .await?
742            .unwrap();
743
744        assert_eq!(response, library_full.songs);
745
746        Ok(())
747    }
748
749    #[rstest]
750    #[tokio::test]
751    async fn test_dynamic_playlist_create(#[future] client: MusicPlayerClient) -> Result<()> {
752        let client = client.await;
753
754        let ctx = tarpc::context::current();
755
756        let query: Query = "artist CONTAINS \"Artist 0\"".parse()?;
757
758        let response = client
759            .dynamic_playlist_create(ctx, "Dynamic Playlist 0".into(), query)
760            .await?;
761
762        assert!(response.is_ok());
763
764        Ok(())
765    }
766
767    #[rstest]
768    #[tokio::test]
769    async fn test_dynamic_playlist_list(#[future] client: MusicPlayerClient) -> Result<()> {
770        let client = client.await;
771
772        let ctx = tarpc::context::current();
773
774        let query: Query = "artist CONTAINS \"Artist 0\"".parse()?;
775
776        let dynamic_playlist_id = client
777            .dynamic_playlist_create(ctx, "Dynamic Playlist 0".into(), query)
778            .await?
779            .unwrap();
780
781        let ctx = tarpc::context::current();
782        let response = client.dynamic_playlist_list(ctx).await?;
783
784        assert_eq!(response.len(), 1);
785        assert_eq!(response.first().unwrap().id, dynamic_playlist_id.into());
786
787        Ok(())
788    }
789
790    #[rstest]
791    #[tokio::test]
792    async fn test_dynamic_playlist_update(#[future] client: MusicPlayerClient) -> Result<()> {
793        let client = client.await;
794
795        let ctx = tarpc::context::current();
796
797        let query: Query = "artist CONTAINS \"Artist 0\"".parse()?;
798
799        let dynamic_playlist_id = client
800            .dynamic_playlist_create(ctx, "Dynamic Playlist 0".into(), query.clone())
801            .await?
802            .unwrap();
803
804        let ctx = tarpc::context::current();
805        let response = client
806            .dynamic_playlist_update(
807                ctx,
808                dynamic_playlist_id.clone(),
809                DynamicPlaylistChangeSet::new().name("Dynamic Playlist 1"),
810            )
811            .await?;
812
813        let expected = DynamicPlaylist {
814            id: dynamic_playlist_id.clone().into(),
815            name: "Dynamic Playlist 1".into(),
816            query: query.clone(),
817        };
818
819        assert_eq!(response, Ok(expected.clone()));
820
821        let ctx = tarpc::context::current();
822        let response = client
823            .dynamic_playlist_get(ctx, dynamic_playlist_id)
824            .await?
825            .unwrap();
826
827        assert_eq!(response, expected);
828
829        Ok(())
830    }
831
832    #[rstest]
833    #[tokio::test]
834    async fn test_dynamic_playlist_remove(#[future] client: MusicPlayerClient) -> Result<()> {
835        let client = client.await;
836
837        let ctx = tarpc::context::current();
838
839        let query: Query = "artist CONTAINS \"Artist 0\"".parse()?;
840
841        let dynamic_playlist_id = client
842            .dynamic_playlist_create(ctx, "Dynamic Playlist 0".into(), query)
843            .await?
844            .unwrap();
845
846        let ctx = tarpc::context::current();
847        let response = client
848            .dynamic_playlist_remove(ctx, dynamic_playlist_id)
849            .await?;
850
851        assert_eq!(response, Ok(()));
852
853        let ctx = tarpc::context::current();
854        let response = client.dynamic_playlist_list(ctx).await?;
855
856        assert_eq!(response.len(), 0);
857
858        Ok(())
859    }
860
861    #[rstest]
862    #[tokio::test]
863    async fn test_dynamic_playlist_get(#[future] client: MusicPlayerClient) -> Result<()> {
864        let client = client.await;
865
866        let ctx = tarpc::context::current();
867
868        let query: Query = "artist CONTAINS \"Artist 0\"".parse()?;
869
870        let dynamic_playlist_id = client
871            .dynamic_playlist_create(ctx, "Dynamic Playlist 0".into(), query.clone())
872            .await?
873            .unwrap();
874
875        let ctx = tarpc::context::current();
876        let response = client
877            .dynamic_playlist_get(ctx, dynamic_playlist_id)
878            .await?
879            .unwrap();
880
881        assert_eq!(response.name, "Dynamic Playlist 0");
882        assert_eq!(response.query, query);
883
884        Ok(())
885    }
886
887    #[rstest]
888    #[tokio::test]
889    async fn test_dynamic_playlist_get_songs(#[future] client: MusicPlayerClient) -> Result<()> {
890        let client = client.await;
891
892        let ctx = tarpc::context::current();
893
894        let query: Query = "artist CONTAINS \"Artist 0\"".parse()?;
895
896        let dynamic_playlist_id = client
897            .dynamic_playlist_create(ctx, "Dynamic Playlist 0".into(), query)
898            .await?
899            .unwrap();
900
901        let ctx = tarpc::context::current();
902        let response = client
903            .dynamic_playlist_get_songs(ctx, dynamic_playlist_id)
904            .await?
905            .unwrap();
906
907        assert_eq!(response.len(), 1);
908
909        Ok(())
910    }
911
912    // Dynamic Playlist Import Tests
913    #[rstest]
914    #[tokio::test]
915    async fn test_dynamic_playlist_import(#[future] client: MusicPlayerClient) -> Result<()> {
916        let client = client.await;
917
918        let tmpfile = tempfile::NamedTempFile::with_suffix("dps.csv")?;
919
920        // write a csv file to the temp file
921        let mut file = tmpfile.reopen()?;
922        writeln!(file, "dynamic playlist name,query")?;
923        writeln!(file, "Dynamic Playlist 0,artist CONTAINS \"Artist 0\"")?;
924
925        let tmpfile_path = tmpfile.path().to_path_buf();
926
927        let query: Query = "artist CONTAINS \"Artist 0\"".parse()?;
928
929        let ctx = tarpc::context::current();
930        let response = client.dynamic_playlist_import(ctx, tmpfile_path).await??;
931
932        let expected = DynamicPlaylist {
933            id: response[0].id.clone(),
934            name: "Dynamic Playlist 0".into(),
935            query: query.clone(),
936        };
937
938        assert_eq!(response, vec![expected]);
939
940        Ok(())
941    }
942    #[rstest]
943    #[tokio::test]
944    async fn test_dynamic_playlist_import_file_nonexistent(
945        #[future] client: MusicPlayerClient,
946    ) -> Result<()> {
947        let client = client.await;
948
949        let tmpfile = tempfile::NamedTempFile::with_suffix("dps.csv")?;
950
951        // write a csv file to the temp file
952        let mut file = tmpfile.reopen()?;
953        writeln!(file, "artist,album,album_artist,title")?;
954
955        let tmpfile_path = "/this/path/does/not/exist.csv";
956
957        let ctx = tarpc::context::current();
958        let response = client
959            .dynamic_playlist_import(ctx, tmpfile_path.into())
960            .await?;
961        assert!(response.is_err(), "response: {response:?}");
962        assert_eq!(
963            response.unwrap_err().to_string(),
964            format!("Backup Error: The file \"{tmpfile_path}\" does not exist")
965        );
966        Ok(())
967    }
968    #[rstest]
969    #[tokio::test]
970    async fn test_dynamic_playlist_import_file_wrong_extension(
971        #[future] client: MusicPlayerClient,
972    ) -> Result<()> {
973        let client = client.await;
974
975        let tmpfile = tempfile::NamedTempFile::with_suffix("dps.txt")?;
976
977        // write a csv file to the temp file
978        let mut file = tmpfile.reopen()?;
979        writeln!(file, "artist,album,album_artist,title")?;
980
981        let tmpfile_path = tmpfile.path().to_path_buf();
982
983        let ctx = tarpc::context::current();
984        let response = client
985            .dynamic_playlist_import(ctx, tmpfile_path.clone())
986            .await?;
987        assert!(response.is_err(), "response: {response:?}");
988        assert_eq!(
989            response.unwrap_err().to_string(),
990            format!(
991                "Backup Error: The file \"{}\" has the wrong extension, expected: csv",
992                tmpfile_path.display()
993            )
994        );
995        Ok(())
996    }
997    #[rstest]
998    #[tokio::test]
999    async fn test_dynamic_playlist_import_file_is_directory(
1000        #[future] client: MusicPlayerClient,
1001    ) -> Result<()> {
1002        let client = client.await;
1003
1004        let tmpfile = tempfile::tempdir()?;
1005
1006        let tmpfile_path = tmpfile.path().to_path_buf();
1007
1008        let ctx = tarpc::context::current();
1009        let response = client
1010            .dynamic_playlist_import(ctx, tmpfile_path.clone())
1011            .await?;
1012        assert!(response.is_err());
1013        assert_eq!(
1014            response.unwrap_err().to_string(),
1015            format!(
1016                "Backup Error: {} is a directory, not a file",
1017                tmpfile_path.display()
1018            )
1019        );
1020        Ok(())
1021    }
1022    #[rstest]
1023    #[tokio::test]
1024    async fn test_dynamic_playlist_import_file_invalid_format(
1025        #[future] client: MusicPlayerClient,
1026    ) -> Result<()> {
1027        let client = client.await;
1028
1029        let tmpfile = tempfile::NamedTempFile::with_suffix("dps.csv")?;
1030
1031        // write a csv file to the temp file
1032        let mut file = tmpfile.reopen()?;
1033        writeln!(file, "artist,album,album_artist,title")?;
1034
1035        let tmpfile_path = tmpfile.path().to_path_buf();
1036
1037        let ctx = tarpc::context::current();
1038        let response = client.dynamic_playlist_import(ctx, tmpfile_path).await?;
1039        assert!(response.is_err());
1040        assert_eq!(
1041            response.unwrap_err().to_string(),
1042            "Backup Error: No valid playlists were found in the csv file."
1043        );
1044        Ok(())
1045    }
1046    #[rstest]
1047    #[tokio::test]
1048    async fn test_dynamic_playlist_import_file_invalid_query(
1049        #[future] client: MusicPlayerClient,
1050    ) -> Result<()> {
1051        let client = client.await;
1052
1053        let tmpfile = tempfile::NamedTempFile::with_suffix("dps.csv")?;
1054
1055        // write a csv file to the temp file
1056        let mut file = tmpfile.reopen()?;
1057        writeln!(file, "dynamic playlist name,query")?;
1058        writeln!(file, "Dynamic Playlist 0,artist CONTAINS \"Artist 0\"")?;
1059        writeln!(file, "Dynamic Playlist 1,artist CONTAINS \"")?;
1060
1061        let tmpfile_path = tmpfile.path().to_path_buf();
1062
1063        let ctx = tarpc::context::current();
1064        let response = client.dynamic_playlist_import(ctx, tmpfile_path).await?;
1065        assert!(
1066            matches!(
1067                response,
1068                Err(SerializableLibraryError::BackupError(
1069                    BackupError::InvalidDynamicPlaylistQuery(_, 2)
1070                ))
1071            ),
1072            "response: {response:?}"
1073        );
1074        Ok(())
1075    }
1076
1077    // Dynamic Playlist Export Tests
1078    #[rstest]
1079    #[tokio::test]
1080    async fn test_dynamic_playlist_export(#[future] client: MusicPlayerClient) -> Result<()> {
1081        let client = client.await;
1082
1083        let tmpdir = tempfile::tempdir()?;
1084        let path = tmpdir.path().join("test.csv");
1085
1086        let query: Query = "artist CONTAINS \"Artist 0\"".parse()?;
1087        let ctx = tarpc::context::current();
1088        let _ = client
1089            .dynamic_playlist_create(ctx, "Dynamic Playlist 0".into(), query.clone())
1090            .await?
1091            .unwrap();
1092
1093        let expected = r#"dynamic playlist name,query
1094Dynamic Playlist 0,"artist CONTAINS ""Artist 0"""
1095"#;
1096
1097        let response = client.dynamic_playlist_export(ctx, path.clone()).await?;
1098        assert_eq!(response, Ok(()));
1099
1100        let mut file = std::fs::File::open(path.clone())?;
1101        let mut contents = String::new();
1102        file.read_to_string(&mut contents)?;
1103        assert_str_eq!(contents, expected);
1104
1105        Ok(())
1106    }
1107    #[rstest]
1108    #[tokio::test]
1109    async fn test_dynamic_playlist_export_file_exists(
1110        #[future] client: MusicPlayerClient,
1111    ) -> Result<()> {
1112        let client = client.await;
1113
1114        let tmpfile = tempfile::NamedTempFile::with_suffix("dps.csv")?;
1115
1116        let ctx = tarpc::context::current();
1117        let response = client
1118            .dynamic_playlist_export(ctx, tmpfile.path().to_path_buf())
1119            .await?;
1120        assert!(matches!(response, Ok(())), "response: {response:?}");
1121        Ok(())
1122    }
1123    #[rstest]
1124    #[tokio::test]
1125    async fn test_dynamic_playlist_export_file_is_directory(
1126        #[future] client: MusicPlayerClient,
1127    ) -> Result<()> {
1128        let client = client.await;
1129
1130        let tmpfile = tempfile::tempdir()?;
1131
1132        let ctx = tarpc::context::current();
1133        let response = client
1134            .dynamic_playlist_export(ctx, tmpfile.path().to_path_buf())
1135            .await?;
1136        assert!(
1137            matches!(
1138                response,
1139                Err(SerializableLibraryError::BackupError(
1140                    BackupError::PathIsDirectory(_)
1141                ))
1142            ),
1143            "response: {response:?}"
1144        );
1145        Ok(())
1146    }
1147    #[rstest]
1148    #[tokio::test]
1149    async fn test_dynamic_playlist_export_file_invalid_extension(
1150        #[future] client: MusicPlayerClient,
1151    ) -> Result<()> {
1152        let client = client.await;
1153
1154        let tmpfile = tempfile::NamedTempFile::with_suffix("dps.txt")?;
1155
1156        let ctx = tarpc::context::current();
1157        let response = client
1158            .dynamic_playlist_export(ctx, tmpfile.path().to_path_buf())
1159            .await?;
1160        assert!(response.is_err(), "response: {response:?}");
1161        let err = response.unwrap_err();
1162        assert!(
1163            matches!(
1164                &err,
1165                SerializableLibraryError::BackupError(
1166                    BackupError::WrongExtension(_, expected_extension)
1167                ) if expected_extension == "csv"
1168            ),
1169            "response: {err:?}"
1170        );
1171
1172        Ok(())
1173    }
1174
1175    // Playlist import test
1176    #[rstest]
1177    #[tokio::test]
1178    async fn test_playlist_import(#[future] client: MusicPlayerClient) -> Result<()> {
1179        let client = client.await;
1180
1181        let tmpfile = tempfile::NamedTempFile::with_suffix("pl.m3u")?;
1182
1183        // write a csv file to the temp file
1184        let mut file = tmpfile.reopen()?;
1185        write!(
1186            file,
1187            r"#EXTM3U
1188#EXTINF:123,Sample Artist - Sample title
1189/path/to/song.mp3
1190"
1191        )?;
1192
1193        let tmpfile_path = tmpfile.path().to_path_buf();
1194
1195        let ctx = tarpc::context::current();
1196        let response = client.playlist_import(ctx, tmpfile_path, None).await?;
1197        assert!(response.is_ok());
1198        let response = response.unwrap();
1199
1200        let ctx = tarpc::context::current();
1201        let playlist = client.playlist_get(ctx, response.clone()).await?.unwrap();
1202
1203        assert_eq!(playlist.name, "Imported Playlist");
1204        assert_eq!(playlist.song_count, 1);
1205
1206        let ctx = tarpc::context::current();
1207        let songs = client
1208            .playlist_get_songs(ctx, response.clone())
1209            .await?
1210            .unwrap();
1211        assert_eq!(songs.len(), 1);
1212        assert_eq!(songs[0].path.to_string_lossy(), "/path/to/song.mp3");
1213
1214        Ok(())
1215    }
1216
1217    #[rstest]
1218    #[tokio::test]
1219    async fn test_playlist_export(#[future] client: MusicPlayerClient) -> Result<()> {
1220        let client = client.await;
1221
1222        let tmpdir = tempfile::tempdir()?;
1223        let path = tmpdir.path().join("test.m3u");
1224
1225        let ctx = tarpc::context::current();
1226        let library_full: LibraryFull = client.library_full(ctx).await??;
1227
1228        let playlist = library_full.playlists[0].clone();
1229
1230        let response = client
1231            .playlist_export(ctx, playlist.id.clone().into(), path.clone())
1232            .await?;
1233        assert_eq!(response, Ok(()));
1234
1235        let mut file = std::fs::File::open(path.clone())?;
1236        let mut contents = String::new();
1237        file.read_to_string(&mut contents)?;
1238        assert_str_eq!(
1239            contents,
1240            r"#EXTM3U
1241
1242#PLAYLIST:Playlist 0
1243
1244#EXTINF:120,Song 0 - Artist 0
1245#EXTGENRE:Genre 0
1246#EXTALB:Artist 0
1247/path/to/song.mp3
1248
1249"
1250        );
1251
1252        Ok(())
1253    }
1254}