mecomp_daemon/
lib.rs

1#![deny(clippy::missing_inline_in_public_items)]
2
3//----------------------------------------------------------------------------------------- std lib
4use std::{
5    net::{IpAddr, Ipv4Addr},
6    path::PathBuf,
7    sync::Arc,
8};
9//--------------------------------------------------------------------------------- other libraries
10use futures::{future, prelude::*};
11use log::{error, info};
12use persistence::QueueState;
13use surrealdb::{Surreal, engine::local::Db};
14use tarpc::{
15    self,
16    serde_transport::tcp,
17    server::{BaseChannel, Channel as _, incoming::Incoming as _},
18    tokio_serde::formats::Json,
19};
20use tokio::{runtime::Handle, sync::RwLock};
21use tracing::Instrument;
22//-------------------------------------------------------------------------------- MECOMP libraries
23use mecomp_core::{
24    audio::{AudioKernelSender, commands::AudioCommand},
25    config::Settings,
26    is_server_running,
27    logger::{init_logger, init_tracing},
28    rpc::{MusicPlayer as _, MusicPlayerClient},
29    udp::{Message, Sender, StateChange},
30};
31use mecomp_storage::db::{init_database, set_database_path};
32
33async fn spawn<F>(fut: F)
34where
35    F: Future<Output = ()> + Send + 'static,
36{
37    if size_of::<F>() > 1024 {
38        // if the future is too big, box it before spawning
39        let fut = Box::pin(fut);
40        tokio::spawn(fut);
41    } else {
42        // if the future is small enough, spawn it directly
43        tokio::spawn(fut);
44    }
45}
46
47pub mod controller;
48#[cfg(feature = "dynamic_updates")]
49pub mod dynamic_updates;
50pub mod persistence;
51pub mod services;
52pub mod termination;
53#[cfg(test)]
54pub use mecomp_core::test_utils;
55
56use crate::controller::MusicPlayerServer;
57
58/// The number of connections per IP address.
59const CHANNELS_PER_IP: u32 = 10;
60/// The maximum number of concurrent requests.
61pub const MAX_CONCURRENT_REQUESTS: usize = 4;
62
63/// Event Publisher guard
64///
65/// This is a newtype for the event publisher that ensures it is stopped when the guard is dropped.
66struct EventPublisher {
67    dispatcher: Arc<RwLock<Sender<Message>>>,
68    event_tx: std::sync::mpsc::Sender<StateChange>,
69    handle: tokio::task::JoinHandle<()>,
70}
71
72impl EventPublisher {
73    /// Start the event publisher
74    pub async fn new() -> Self {
75        let (event_tx, event_rx) = std::sync::mpsc::channel();
76        let event_publisher = Arc::new(RwLock::const_new(Sender::new().await.unwrap()));
77        let event_publisher_clone = event_publisher.clone();
78
79        let handle = tokio::task::spawn_blocking(move || {
80            while let Ok(event) = event_rx.recv() {
81                // re-enter the async context to send the event over UDP
82                Handle::current().block_on(async {
83                    if let Err(e) = event_publisher_clone
84                        .read()
85                        .await
86                        .send(Message::StateChange(event))
87                        .await
88                    {
89                        error!("Failed to send event over UDP: {e}");
90                    }
91                });
92            }
93        })
94        .instrument(tracing::info_span!("event_publisher"));
95
96        Self {
97            dispatcher: event_publisher,
98            event_tx,
99            handle: handle.into_inner(),
100        }
101    }
102}
103
104impl Drop for EventPublisher {
105    fn drop(&mut self) {
106        // Stop the event publisher thread
107        self.handle.abort();
108    }
109}
110
111// TODO: at some point, we should probably add a panic handler to the daemon to ensure graceful shutdown.
112
113/// Run the daemon
114///
115/// also initializes the logger, database, and other necessary components.
116///
117/// # Arguments
118///
119/// * `settings` - The settings to use.
120/// * `db_dir` - The directory where the database is stored.
121///   If the directory does not exist, it will be created.
122/// * `log_file_path` - The path to the file where logs will be written.
123/// * `state_file_path` - The path to the file where the queue state restored from / saved to.
124///
125/// # Errors
126///
127/// If the daemon cannot be started, an error is returned.
128///
129/// # Panics
130///
131/// Panics if the peer address of the underlying TCP transport cannot be determined.
132#[inline]
133#[allow(clippy::redundant_pub_crate)]
134pub async fn start_daemon(
135    settings: Settings,
136    db_dir: PathBuf,
137    log_file_path: Option<PathBuf>,
138    state_file_path: Option<PathBuf>,
139) -> anyhow::Result<()> {
140    // Throw the given settings into an Arc so we can share settings across threads.
141    let settings = Arc::new(settings);
142
143    // check if a server is already running
144    if is_server_running(settings.daemon.rpc_port) {
145        anyhow::bail!(
146            "A server is already running on port {}",
147            settings.daemon.rpc_port
148        );
149    }
150
151    // Initialize the logger, database, and tracing.
152    init_logger(settings.daemon.log_level, log_file_path);
153    set_database_path(db_dir)?;
154    let db = Arc::new(init_database().await?);
155    tracing::subscriber::set_global_default(init_tracing())?;
156
157    // initialize the termination handler
158    let (terminator, mut interrupt_rx) = termination::create_termination();
159
160    // Start the music library watcher.
161    #[cfg(feature = "dynamic_updates")]
162    let guard = dynamic_updates::init_music_library_watcher(
163        db.clone(),
164        &settings.daemon.library_paths,
165        settings.daemon.artist_separator.clone(),
166        settings.daemon.protected_artist_names.clone(),
167        settings.daemon.genre_separator.clone(),
168        interrupt_rx.resubscribe(),
169    )?;
170
171    // initialize the event publisher
172    let event_publisher_guard = EventPublisher::new().await;
173
174    // Start the audio kernel.
175    let audio_kernel = AudioKernelSender::start(event_publisher_guard.event_tx.clone());
176
177    // optionally restore the queue state
178    if let Some(state_path) = &state_file_path {
179        info!("Restoring queue state from {}", state_path.display());
180        if let Err(e) =
181            QueueState::load_from_file(state_path).map(|state| state.restore_to(&audio_kernel))
182        {
183            error!("Failed to restore queue state: {e}");
184        }
185    }
186
187    // Initialize the server.
188    let server = MusicPlayerServer::new(
189        db.clone(),
190        settings.clone(),
191        audio_kernel.clone(),
192        event_publisher_guard.dispatcher.clone(),
193        terminator.clone(),
194        interrupt_rx.resubscribe(),
195    );
196
197    // Start the RPC server.
198    let server_addr = (IpAddr::V4(Ipv4Addr::LOCALHOST), settings.daemon.rpc_port);
199
200    let mut listener = match tcp::listen(&server_addr, Json::default).await {
201        Ok(listener) => listener,
202        Err(e) => {
203            error!("Failed to start server: {e}");
204
205            #[cfg(feature = "dynamic_updates")]
206            guard.stop();
207
208            return Err(anyhow::anyhow!("Failed to start server: {e}"));
209        }
210    };
211    info!("Listening on {}", listener.local_addr());
212    listener.config_mut().max_frame_length(usize::MAX);
213    let server_handle = listener
214        // Ignore accept errors.
215        .filter_map(|r| future::ready(r.ok()))
216        .map(BaseChannel::with_defaults)
217        // Limit channels to 10 per IP.
218        .max_channels_per_key(CHANNELS_PER_IP, |t| t.transport().peer_addr().unwrap().ip())
219        // Set up the server's handling of incoming connections.
220        // serve is generated by the service attribute.
221        // It takes as input any type implementing the generated MusicPlayer trait.
222        .map(|channel| channel.execute(server.clone().serve()).for_each(spawn))
223        // Max 4 channels.
224        // this means that we will only process 4 requests at a time
225        // NOTE: if we have issues with concurrency (e.g. deadlocks or data-races),
226        //       and have too much of a skill issue to fix it, we can set this number to 1.
227        .buffer_unordered(MAX_CONCURRENT_REQUESTS)
228        .for_each(async |()| {})
229        // make it fused so we can stop it later
230        .fuse();
231
232    // run the server until it is terminated
233    tokio::select! {
234        () = server_handle => {
235            error!("Server stopped unexpectedly");
236        },
237        // Wait for the server to be stopped.
238        // This will be triggered by the signal handler.
239        reason = interrupt_rx.wait() => {
240            match reason {
241                Ok(termination::Interrupted::UserInt) => info!("Stopping server per user request"),
242                Ok(termination::Interrupted::OsSigInt) => info!("Stopping server because of an os sig int"),
243                Ok(termination::Interrupted::OsSigTerm) => info!("Stopping server because of an os sig term"),
244                Ok(termination::Interrupted::OsSigQuit) => info!("Stopping server because of an os sig quit"),
245                Err(e) => error!("Stopping server because of an unexpected error: {e}"),
246            }
247        }
248    }
249
250    #[cfg(feature = "dynamic_updates")]
251    guard.stop();
252
253    // send a shutdown event to all clients (ignore errors)
254    let _ = event_publisher_guard
255        .dispatcher
256        .read()
257        .await
258        .send(Message::Event(mecomp_core::udp::Event::DaemonShutdown))
259        .await;
260
261    if let Some(state_path) = &state_file_path {
262        info!("Persisting queue state to {}", state_path.display());
263        if let Err(e) = QueueState::retrieve(&audio_kernel)
264            .await
265            .and_then(|state| state.save_to_file(state_path))
266        {
267            error!("Failed to persist queue state: {e}");
268        }
269    }
270
271    log::info!("Cleanup complete, exiting...");
272
273    Ok(())
274}
275
276/// Initialize a test client, sends and receives messages over a channel / pipe.
277/// This is useful for testing the server without needing to start it.
278///
279/// # Errors
280///
281/// Errors if the event publisher cannot be created.
282#[inline]
283pub async fn init_test_client_server(
284    db: Arc<Surreal<Db>>,
285    settings: Arc<Settings>,
286    audio_kernel: Arc<AudioKernelSender>,
287) -> anyhow::Result<MusicPlayerClient> {
288    let (client_transport, server_transport) = tarpc::transport::channel::unbounded();
289
290    let event_publisher = Arc::new(RwLock::new(Sender::new().await?));
291    // initialize the termination handler
292    let (terminator, mut interrupt_rx) = termination::create_termination();
293    #[allow(clippy::redundant_pub_crate)]
294    tokio::spawn(async move {
295        let server = MusicPlayerServer::new(
296            db,
297            settings,
298            audio_kernel.clone(),
299            event_publisher.clone(),
300            terminator,
301            interrupt_rx.resubscribe(),
302        );
303        tokio::select! {
304            () = tarpc::server::BaseChannel::with_defaults(server_transport)
305                .execute(server.serve())
306                // Handle all requests concurrently.
307                .for_each(async |response| {
308                    tokio::spawn(response);
309                }) => {},
310            // Wait for the server to be stopped.
311            _ = interrupt_rx.wait() => {
312                // Stop the server.
313                info!("Stopping server...");
314                audio_kernel.send(AudioCommand::Exit);
315                let _ = event_publisher.read().await.send(Message::Event(mecomp_core::udp::Event::DaemonShutdown)).await;
316                info!("Server stopped");
317            }
318        }
319    });
320
321    // MusicPlayerClient is generated by the #[tarpc::service] attribute. It has a constructor `new`
322    // that takes a config and any Transport as input.
323    Ok(MusicPlayerClient::new(tarpc::client::Config::default(), client_transport).spawn())
324}
325
326#[cfg(test)]
327mod test_client_tests {
328    //! Tests for:
329    //! - the `init_test_client_server` function
330    //! - daemon endpoints that aren't covered in other tests
331
332    use std::io::{Read, Write};
333
334    use super::*;
335    use anyhow::Result;
336    use mecomp_core::{
337        errors::{BackupError, SerializableLibraryError},
338        state::library::LibraryFull,
339    };
340    use mecomp_storage::{
341        db::schemas::{
342            collection::Collection,
343            dynamic::{DynamicPlaylist, DynamicPlaylistChangeSet, query::Query},
344            playlist::Playlist,
345            song::SongChangeSet,
346        },
347        test_utils::{SongCase, create_song_with_overrides, init_test_database},
348    };
349
350    use pretty_assertions::{assert_eq, assert_str_eq};
351    use rstest::{fixture, rstest};
352
353    #[fixture]
354    async fn db() -> Arc<Surreal<Db>> {
355        let db = Arc::new(init_test_database().await.unwrap());
356
357        // create a test song, add it to a playlist and collection
358
359        let song_case = SongCase::new(0, vec![0], vec![0], 0, 0);
360
361        // Call the create_song function
362        let song = create_song_with_overrides(
363            &db,
364            song_case,
365            SongChangeSet {
366                // need to specify overrides so that items are created in the db
367                artist: Some(one_or_many::OneOrMany::One("Artist 0".into())),
368                album_artist: Some(one_or_many::OneOrMany::One("Artist 0".into())),
369                album: Some("Album 0".into()),
370                path: Some("/path/to/song.mp3".into()),
371                ..Default::default()
372            },
373        )
374        .await
375        .unwrap();
376
377        // create a playlist with the song
378        let playlist = Playlist {
379            id: Playlist::generate_id(),
380            name: "Playlist 0".into(),
381            runtime: song.runtime,
382            song_count: 1,
383        };
384
385        let result = Playlist::create(&db, playlist).await.unwrap().unwrap();
386
387        Playlist::add_songs(&db, result.id, vec![song.id.clone()])
388            .await
389            .unwrap();
390
391        // create a collection with the song
392        let collection = Collection {
393            id: Collection::generate_id(),
394            name: "Collection 0".into(),
395            runtime: song.runtime,
396            song_count: 1,
397        };
398
399        let result = Collection::create(&db, collection).await.unwrap().unwrap();
400
401        Collection::add_songs(&db, result.id, vec![song.id])
402            .await
403            .unwrap();
404
405        return db;
406    }
407
408    #[fixture]
409    async fn client(#[future] db: Arc<Surreal<Db>>) -> MusicPlayerClient {
410        let settings = Arc::new(Settings::default());
411        let (tx, _) = std::sync::mpsc::channel();
412        let audio_kernel = AudioKernelSender::start(tx);
413
414        init_test_client_server(db.await, settings, audio_kernel)
415            .await
416            .unwrap()
417    }
418
419    #[tokio::test]
420    async fn test_init_test_client_server() {
421        let db = Arc::new(init_test_database().await.unwrap());
422        let settings = Arc::new(Settings::default());
423        let (tx, _) = std::sync::mpsc::channel();
424        let audio_kernel = AudioKernelSender::start(tx);
425
426        let client = init_test_client_server(db, settings, audio_kernel)
427            .await
428            .unwrap();
429
430        let ctx = tarpc::context::current();
431        let response = client.ping(ctx).await.unwrap();
432
433        assert_eq!(response, "pong");
434
435        // ensure that the client is shutdown properly
436        drop(client);
437    }
438
439    #[rstest]
440    #[tokio::test]
441    async fn test_library_song_get_artist(#[future] client: MusicPlayerClient) -> Result<()> {
442        let client = client.await;
443
444        let ctx = tarpc::context::current();
445        let library_full: LibraryFull = client.library_full(ctx).await??;
446
447        let ctx = tarpc::context::current();
448        let response = client
449            .library_song_get_artist(ctx, library_full.songs.first().unwrap().id.clone().into())
450            .await?;
451
452        assert_eq!(response, library_full.artists.into_vec().into());
453
454        Ok(())
455    }
456
457    #[rstest]
458    #[tokio::test]
459    async fn test_library_song_get_album(#[future] client: MusicPlayerClient) -> Result<()> {
460        let client = client.await;
461
462        let ctx = tarpc::context::current();
463        let library_full: LibraryFull = client.library_full(ctx).await??;
464
465        let ctx = tarpc::context::current();
466        let response = client
467            .library_song_get_album(ctx, library_full.songs.first().unwrap().id.clone().into())
468            .await?
469            .unwrap();
470
471        assert_eq!(response, library_full.albums.first().unwrap().clone());
472
473        Ok(())
474    }
475
476    #[rstest]
477    #[tokio::test]
478    async fn test_library_song_get_playlists(#[future] client: MusicPlayerClient) -> Result<()> {
479        let client = client.await;
480
481        let ctx = tarpc::context::current();
482        let library_full: LibraryFull = client.library_full(ctx).await??;
483
484        let ctx = tarpc::context::current();
485        let response = client
486            .library_song_get_playlists(ctx, library_full.songs.first().unwrap().id.clone().into())
487            .await?;
488
489        assert_eq!(response, library_full.playlists.into_vec().into());
490
491        Ok(())
492    }
493
494    #[rstest]
495    #[tokio::test]
496    async fn test_library_album_get_artist(#[future] client: MusicPlayerClient) -> Result<()> {
497        let client = client.await;
498
499        let ctx = tarpc::context::current();
500        let library_full: LibraryFull = client.library_full(ctx).await??;
501
502        let ctx = tarpc::context::current();
503        let response = client
504            .library_album_get_artist(ctx, library_full.albums.first().unwrap().id.clone().into())
505            .await?;
506
507        assert_eq!(response, library_full.artists.into_vec().into());
508
509        Ok(())
510    }
511
512    #[rstest]
513    #[tokio::test]
514    async fn test_library_album_get_songs(#[future] client: MusicPlayerClient) -> Result<()> {
515        let client = client.await;
516
517        let ctx = tarpc::context::current();
518        let library_full: LibraryFull = client.library_full(ctx).await??;
519
520        let ctx = tarpc::context::current();
521        let response = client
522            .library_album_get_songs(ctx, library_full.albums.first().unwrap().id.clone().into())
523            .await?
524            .unwrap();
525
526        assert_eq!(response, library_full.songs);
527
528        Ok(())
529    }
530
531    #[rstest]
532    #[tokio::test]
533    async fn test_library_artist_get_songs(#[future] client: MusicPlayerClient) -> Result<()> {
534        let client = client.await;
535
536        let ctx = tarpc::context::current();
537        let library_full: LibraryFull = client.library_full(ctx).await??;
538
539        let ctx = tarpc::context::current();
540        let response = client
541            .library_artist_get_songs(ctx, library_full.artists.first().unwrap().id.clone().into())
542            .await?
543            .unwrap();
544
545        assert_eq!(response, library_full.songs);
546
547        Ok(())
548    }
549
550    #[rstest]
551    #[tokio::test]
552    async fn test_library_artist_get_albums(#[future] client: MusicPlayerClient) -> Result<()> {
553        let client = client.await;
554
555        let ctx = tarpc::context::current();
556        let library_full: LibraryFull = client.library_full(ctx).await??;
557
558        let ctx = tarpc::context::current();
559        let response = client
560            .library_artist_get_albums(ctx, library_full.artists.first().unwrap().id.clone().into())
561            .await?
562            .unwrap();
563
564        assert_eq!(response, library_full.albums);
565
566        Ok(())
567    }
568
569    #[rstest]
570    #[tokio::test]
571    async fn test_playback_volume_toggle_mute(#[future] client: MusicPlayerClient) -> Result<()> {
572        let client = client.await;
573
574        let ctx = tarpc::context::current();
575
576        client.playback_volume_toggle_mute(ctx).await?;
577        Ok(())
578    }
579
580    #[rstest]
581    #[tokio::test]
582    async fn test_playback_stop(#[future] client: MusicPlayerClient) -> Result<()> {
583        let client = client.await;
584
585        let ctx = tarpc::context::current();
586
587        client.playback_stop(ctx).await?;
588        Ok(())
589    }
590
591    #[rstest]
592    #[tokio::test]
593    async fn test_queue_add_list(#[future] client: MusicPlayerClient) -> Result<()> {
594        let client = client.await;
595
596        let ctx = tarpc::context::current();
597        let library_full: LibraryFull = client.library_full(ctx).await??;
598
599        let ctx = tarpc::context::current();
600        let response = client
601            .queue_add_list(
602                ctx,
603                vec![library_full.songs.first().unwrap().id.clone().into()],
604            )
605            .await?;
606
607        assert_eq!(response, Ok(()));
608
609        Ok(())
610    }
611
612    #[rstest]
613    #[case::get(String::from("Playlist 0"))]
614    #[case::create(String::from("Playlist 1"))]
615    #[tokio::test]
616    async fn test_playlist_get_or_create(
617        #[future] client: MusicPlayerClient,
618        #[case] name: String,
619    ) -> Result<()> {
620        let client = client.await;
621
622        let ctx = tarpc::context::current();
623
624        // get or create the playlist
625        let playlist_id = client
626            .playlist_get_or_create(ctx, name.clone())
627            .await?
628            .unwrap();
629
630        // now get that playlist
631        let ctx = tarpc::context::current();
632        let playlist = client.playlist_get(ctx, playlist_id).await?.unwrap();
633
634        assert_eq!(playlist.name, name);
635
636        Ok(())
637    }
638
639    #[rstest]
640    #[tokio::test]
641    async fn test_playlist_clone(#[future] client: MusicPlayerClient) -> Result<()> {
642        let client = client.await;
643
644        let ctx = tarpc::context::current();
645        let library_full: LibraryFull = client.library_full(ctx).await??;
646
647        // clone the only playlist in the db
648        let ctx = tarpc::context::current();
649        let playlist_id = client
650            .playlist_clone(
651                ctx,
652                library_full.playlists.first().unwrap().id.clone().into(),
653            )
654            .await?
655            .unwrap();
656
657        // now get that playlist
658        let ctx = tarpc::context::current();
659        let playlist = client.playlist_get(ctx, playlist_id).await?.unwrap();
660
661        assert_eq!(playlist.name, "Playlist 0 (copy)");
662
663        Ok(())
664    }
665
666    #[rstest]
667    #[tokio::test]
668    async fn test_playlist_get_songs(#[future] client: MusicPlayerClient) -> Result<()> {
669        let client = client.await;
670
671        let ctx = tarpc::context::current();
672        let library_full: LibraryFull = client.library_full(ctx).await??;
673
674        // clone the only playlist in the db
675        let response = client
676            .playlist_get_songs(
677                ctx,
678                library_full.playlists.first().unwrap().id.clone().into(),
679            )
680            .await?
681            .unwrap();
682
683        assert_eq!(response, library_full.songs);
684
685        Ok(())
686    }
687
688    #[rstest]
689    #[tokio::test]
690    async fn test_playlist_rename(#[future] client: MusicPlayerClient) -> Result<()> {
691        let client = client.await;
692
693        let ctx = tarpc::context::current();
694        let library_full: LibraryFull = client.library_full(ctx).await??;
695
696        let target = library_full.playlists.first().unwrap();
697
698        let ctx = tarpc::context::current();
699        let response = client
700            .playlist_rename(ctx, target.id.clone().into(), "New Name".into())
701            .await?;
702
703        let expected = Playlist {
704            name: "New Name".into(),
705            ..target.clone()
706        };
707
708        assert_eq!(response, Ok(expected.clone()));
709
710        let ctx = tarpc::context::current();
711        let response = client
712            .playlist_get(ctx, target.id.clone().into())
713            .await?
714            .unwrap();
715
716        assert_eq!(response, expected);
717        Ok(())
718    }
719
720    #[rstest]
721    #[tokio::test]
722    async fn test_collection_get_songs(#[future] client: MusicPlayerClient) -> Result<()> {
723        let client = client.await;
724
725        let ctx = tarpc::context::current();
726        let library_full: LibraryFull = client.library_full(ctx).await??;
727
728        // clone the only playlist in the db
729        let response = client
730            .collection_get_songs(
731                ctx,
732                library_full.collections.first().unwrap().id.clone().into(),
733            )
734            .await?
735            .unwrap();
736
737        assert_eq!(response, library_full.songs);
738
739        Ok(())
740    }
741
742    #[rstest]
743    #[tokio::test]
744    async fn test_dynamic_playlist_create(#[future] client: MusicPlayerClient) -> Result<()> {
745        let client = client.await;
746
747        let ctx = tarpc::context::current();
748
749        let query: Query = "artist CONTAINS \"Artist 0\"".parse()?;
750
751        let response = client
752            .dynamic_playlist_create(ctx, "Dynamic Playlist 0".into(), query)
753            .await?;
754
755        assert!(response.is_ok());
756
757        Ok(())
758    }
759
760    #[rstest]
761    #[tokio::test]
762    async fn test_dynamic_playlist_list(#[future] client: MusicPlayerClient) -> Result<()> {
763        let client = client.await;
764
765        let ctx = tarpc::context::current();
766
767        let query: Query = "artist CONTAINS \"Artist 0\"".parse()?;
768
769        let dynamic_playlist_id = client
770            .dynamic_playlist_create(ctx, "Dynamic Playlist 0".into(), query)
771            .await?
772            .unwrap();
773
774        let ctx = tarpc::context::current();
775        let response = client.dynamic_playlist_list(ctx).await?;
776
777        assert_eq!(response.len(), 1);
778        assert_eq!(response.first().unwrap().id, dynamic_playlist_id.into());
779
780        Ok(())
781    }
782
783    #[rstest]
784    #[tokio::test]
785    async fn test_dynamic_playlist_update(#[future] client: MusicPlayerClient) -> Result<()> {
786        let client = client.await;
787
788        let ctx = tarpc::context::current();
789
790        let query: Query = "artist CONTAINS \"Artist 0\"".parse()?;
791
792        let dynamic_playlist_id = client
793            .dynamic_playlist_create(ctx, "Dynamic Playlist 0".into(), query.clone())
794            .await?
795            .unwrap();
796
797        let ctx = tarpc::context::current();
798        let response = client
799            .dynamic_playlist_update(
800                ctx,
801                dynamic_playlist_id.clone(),
802                DynamicPlaylistChangeSet::new().name("Dynamic Playlist 1"),
803            )
804            .await?;
805
806        let expected = DynamicPlaylist {
807            id: dynamic_playlist_id.clone().into(),
808            name: "Dynamic Playlist 1".into(),
809            query: query.clone(),
810        };
811
812        assert_eq!(response, Ok(expected.clone()));
813
814        let ctx = tarpc::context::current();
815        let response = client
816            .dynamic_playlist_get(ctx, dynamic_playlist_id)
817            .await?
818            .unwrap();
819
820        assert_eq!(response, expected);
821
822        Ok(())
823    }
824
825    #[rstest]
826    #[tokio::test]
827    async fn test_dynamic_playlist_remove(#[future] client: MusicPlayerClient) -> Result<()> {
828        let client = client.await;
829
830        let ctx = tarpc::context::current();
831
832        let query: Query = "artist CONTAINS \"Artist 0\"".parse()?;
833
834        let dynamic_playlist_id = client
835            .dynamic_playlist_create(ctx, "Dynamic Playlist 0".into(), query)
836            .await?
837            .unwrap();
838
839        let ctx = tarpc::context::current();
840        let response = client
841            .dynamic_playlist_remove(ctx, dynamic_playlist_id)
842            .await?;
843
844        assert_eq!(response, Ok(()));
845
846        let ctx = tarpc::context::current();
847        let response = client.dynamic_playlist_list(ctx).await?;
848
849        assert_eq!(response.len(), 0);
850
851        Ok(())
852    }
853
854    #[rstest]
855    #[tokio::test]
856    async fn test_dynamic_playlist_get(#[future] client: MusicPlayerClient) -> Result<()> {
857        let client = client.await;
858
859        let ctx = tarpc::context::current();
860
861        let query: Query = "artist CONTAINS \"Artist 0\"".parse()?;
862
863        let dynamic_playlist_id = client
864            .dynamic_playlist_create(ctx, "Dynamic Playlist 0".into(), query.clone())
865            .await?
866            .unwrap();
867
868        let ctx = tarpc::context::current();
869        let response = client
870            .dynamic_playlist_get(ctx, dynamic_playlist_id)
871            .await?
872            .unwrap();
873
874        assert_eq!(response.name, "Dynamic Playlist 0");
875        assert_eq!(response.query, query);
876
877        Ok(())
878    }
879
880    #[rstest]
881    #[tokio::test]
882    async fn test_dynamic_playlist_get_songs(#[future] client: MusicPlayerClient) -> Result<()> {
883        let client = client.await;
884
885        let ctx = tarpc::context::current();
886
887        let query: Query = "artist CONTAINS \"Artist 0\"".parse()?;
888
889        let dynamic_playlist_id = client
890            .dynamic_playlist_create(ctx, "Dynamic Playlist 0".into(), query)
891            .await?
892            .unwrap();
893
894        let ctx = tarpc::context::current();
895        let response = client
896            .dynamic_playlist_get_songs(ctx, dynamic_playlist_id)
897            .await?
898            .unwrap();
899
900        assert_eq!(response.len(), 1);
901
902        Ok(())
903    }
904
905    // Dynamic Playlist Import Tests
906    #[rstest]
907    #[tokio::test]
908    async fn test_dynamic_playlist_import(#[future] client: MusicPlayerClient) -> Result<()> {
909        let client = client.await;
910
911        let tmpfile = tempfile::NamedTempFile::with_suffix("dps.csv")?;
912
913        // write a csv file to the temp file
914        let mut file = tmpfile.reopen()?;
915        writeln!(file, "dynamic playlist name,query")?;
916        writeln!(file, "Dynamic Playlist 0,artist CONTAINS \"Artist 0\"")?;
917
918        let tmpfile_path = tmpfile.path().to_path_buf();
919
920        let query: Query = "artist CONTAINS \"Artist 0\"".parse()?;
921
922        let ctx = tarpc::context::current();
923        let response = client.dynamic_playlist_import(ctx, tmpfile_path).await??;
924
925        let expected = DynamicPlaylist {
926            id: response[0].id.clone(),
927            name: "Dynamic Playlist 0".into(),
928            query: query.clone(),
929        };
930
931        assert_eq!(response, vec![expected]);
932
933        Ok(())
934    }
935    #[rstest]
936    #[tokio::test]
937    async fn test_dynamic_playlist_import_file_nonexistent(
938        #[future] client: MusicPlayerClient,
939    ) -> Result<()> {
940        let client = client.await;
941
942        let tmpfile = tempfile::NamedTempFile::with_suffix("dps.csv")?;
943
944        // write a csv file to the temp file
945        let mut file = tmpfile.reopen()?;
946        writeln!(file, "artist,album,album_artist,title")?;
947
948        let tmpfile_path = "/this/path/does/not/exist.csv";
949
950        let ctx = tarpc::context::current();
951        let response = client
952            .dynamic_playlist_import(ctx, tmpfile_path.into())
953            .await?;
954        assert!(response.is_err(), "response: {response:?}");
955        assert_eq!(
956            response.unwrap_err().to_string(),
957            format!("Backup Error: The file \"{tmpfile_path}\" does not exist")
958        );
959        Ok(())
960    }
961    #[rstest]
962    #[tokio::test]
963    async fn test_dynamic_playlist_import_file_wrong_extension(
964        #[future] client: MusicPlayerClient,
965    ) -> Result<()> {
966        let client = client.await;
967
968        let tmpfile = tempfile::NamedTempFile::with_suffix("dps.txt")?;
969
970        // write a csv file to the temp file
971        let mut file = tmpfile.reopen()?;
972        writeln!(file, "artist,album,album_artist,title")?;
973
974        let tmpfile_path = tmpfile.path().to_path_buf();
975
976        let ctx = tarpc::context::current();
977        let response = client
978            .dynamic_playlist_import(ctx, tmpfile_path.clone())
979            .await?;
980        assert!(response.is_err(), "response: {response:?}");
981        assert_eq!(
982            response.unwrap_err().to_string(),
983            format!(
984                "Backup Error: The file \"{}\" has the wrong extension, expected: csv",
985                tmpfile_path.display()
986            )
987        );
988        Ok(())
989    }
990    #[rstest]
991    #[tokio::test]
992    async fn test_dynamic_playlist_import_file_is_directory(
993        #[future] client: MusicPlayerClient,
994    ) -> Result<()> {
995        let client = client.await;
996
997        let tmpfile = tempfile::tempdir()?;
998
999        let tmpfile_path = tmpfile.path().to_path_buf();
1000
1001        let ctx = tarpc::context::current();
1002        let response = client
1003            .dynamic_playlist_import(ctx, tmpfile_path.clone())
1004            .await?;
1005        assert!(response.is_err());
1006        assert_eq!(
1007            response.unwrap_err().to_string(),
1008            format!(
1009                "Backup Error: {} is a directory, not a file",
1010                tmpfile_path.display()
1011            )
1012        );
1013        Ok(())
1014    }
1015    #[rstest]
1016    #[tokio::test]
1017    async fn test_dynamic_playlist_import_file_invalid_format(
1018        #[future] client: MusicPlayerClient,
1019    ) -> Result<()> {
1020        let client = client.await;
1021
1022        let tmpfile = tempfile::NamedTempFile::with_suffix("dps.csv")?;
1023
1024        // write a csv file to the temp file
1025        let mut file = tmpfile.reopen()?;
1026        writeln!(file, "artist,album,album_artist,title")?;
1027
1028        let tmpfile_path = tmpfile.path().to_path_buf();
1029
1030        let ctx = tarpc::context::current();
1031        let response = client.dynamic_playlist_import(ctx, tmpfile_path).await?;
1032        assert!(response.is_err());
1033        assert_eq!(
1034            response.unwrap_err().to_string(),
1035            "Backup Error: No valid playlists were found in the csv file."
1036        );
1037        Ok(())
1038    }
1039    #[rstest]
1040    #[tokio::test]
1041    async fn test_dynamic_playlist_import_file_invalid_query(
1042        #[future] client: MusicPlayerClient,
1043    ) -> Result<()> {
1044        let client = client.await;
1045
1046        let tmpfile = tempfile::NamedTempFile::with_suffix("dps.csv")?;
1047
1048        // write a csv file to the temp file
1049        let mut file = tmpfile.reopen()?;
1050        writeln!(file, "dynamic playlist name,query")?;
1051        writeln!(file, "Dynamic Playlist 0,artist CONTAINS \"Artist 0\"")?;
1052        writeln!(file, "Dynamic Playlist 1,artist CONTAINS \"")?;
1053
1054        let tmpfile_path = tmpfile.path().to_path_buf();
1055
1056        let ctx = tarpc::context::current();
1057        let response = client.dynamic_playlist_import(ctx, tmpfile_path).await?;
1058        assert!(
1059            matches!(
1060                response,
1061                Err(SerializableLibraryError::BackupError(
1062                    BackupError::InvalidDynamicPlaylistQuery(_, 2)
1063                ))
1064            ),
1065            "response: {response:?}"
1066        );
1067        Ok(())
1068    }
1069
1070    // Dynamic Playlist Export Tests
1071    #[rstest]
1072    #[tokio::test]
1073    async fn test_dynamic_playlist_export(#[future] client: MusicPlayerClient) -> Result<()> {
1074        let client = client.await;
1075
1076        let tmpdir = tempfile::tempdir()?;
1077        let path = tmpdir.path().join("test.csv");
1078
1079        let query: Query = "artist CONTAINS \"Artist 0\"".parse()?;
1080        let ctx = tarpc::context::current();
1081        let _ = client
1082            .dynamic_playlist_create(ctx, "Dynamic Playlist 0".into(), query.clone())
1083            .await?
1084            .unwrap();
1085
1086        let expected = r#"dynamic playlist name,query
1087Dynamic Playlist 0,"artist CONTAINS ""Artist 0"""
1088"#;
1089
1090        let response = client.dynamic_playlist_export(ctx, path.clone()).await?;
1091        assert_eq!(response, Ok(()));
1092
1093        let mut file = std::fs::File::open(path.clone())?;
1094        let mut contents = String::new();
1095        file.read_to_string(&mut contents)?;
1096        assert_str_eq!(contents, expected);
1097
1098        Ok(())
1099    }
1100    #[rstest]
1101    #[tokio::test]
1102    async fn test_dynamic_playlist_export_file_exists(
1103        #[future] client: MusicPlayerClient,
1104    ) -> Result<()> {
1105        let client = client.await;
1106
1107        let tmpfile = tempfile::NamedTempFile::with_suffix("dps.csv")?;
1108
1109        let ctx = tarpc::context::current();
1110        let response = client
1111            .dynamic_playlist_export(ctx, tmpfile.path().to_path_buf())
1112            .await?;
1113        assert!(matches!(response, Ok(())), "response: {response:?}");
1114        Ok(())
1115    }
1116    #[rstest]
1117    #[tokio::test]
1118    async fn test_dynamic_playlist_export_file_is_directory(
1119        #[future] client: MusicPlayerClient,
1120    ) -> Result<()> {
1121        let client = client.await;
1122
1123        let tmpfile = tempfile::tempdir()?;
1124
1125        let ctx = tarpc::context::current();
1126        let response = client
1127            .dynamic_playlist_export(ctx, tmpfile.path().to_path_buf())
1128            .await?;
1129        assert!(
1130            matches!(
1131                response,
1132                Err(SerializableLibraryError::BackupError(
1133                    BackupError::PathIsDirectory(_)
1134                ))
1135            ),
1136            "response: {response:?}"
1137        );
1138        Ok(())
1139    }
1140    #[rstest]
1141    #[tokio::test]
1142    async fn test_dynamic_playlist_export_file_invalid_extension(
1143        #[future] client: MusicPlayerClient,
1144    ) -> Result<()> {
1145        let client = client.await;
1146
1147        let tmpfile = tempfile::NamedTempFile::with_suffix("dps.txt")?;
1148
1149        let ctx = tarpc::context::current();
1150        let response = client
1151            .dynamic_playlist_export(ctx, tmpfile.path().to_path_buf())
1152            .await?;
1153        assert!(response.is_err(), "response: {response:?}");
1154        let err = response.unwrap_err();
1155        assert!(
1156            matches!(
1157                &err,
1158                SerializableLibraryError::BackupError(
1159                    BackupError::WrongExtension(_, expected_extension)
1160                ) if expected_extension == "csv"
1161            ),
1162            "response: {err:?}"
1163        );
1164
1165        Ok(())
1166    }
1167
1168    // Playlist import test
1169    #[rstest]
1170    #[tokio::test]
1171    async fn test_playlist_import(#[future] client: MusicPlayerClient) -> Result<()> {
1172        let client = client.await;
1173
1174        let tmpfile = tempfile::NamedTempFile::with_suffix("pl.m3u")?;
1175
1176        // write a csv file to the temp file
1177        let mut file = tmpfile.reopen()?;
1178        write!(
1179            file,
1180            r"#EXTM3U
1181#EXTINF:123,Sample Artist - Sample title
1182/path/to/song.mp3
1183"
1184        )?;
1185
1186        let tmpfile_path = tmpfile.path().to_path_buf();
1187
1188        let ctx = tarpc::context::current();
1189        let response = client.playlist_import(ctx, tmpfile_path, None).await?;
1190        assert!(response.is_ok());
1191        let response = response.unwrap();
1192
1193        let ctx = tarpc::context::current();
1194        let playlist = client.playlist_get(ctx, response.clone()).await?.unwrap();
1195
1196        assert_eq!(playlist.name, "Imported Playlist");
1197        assert_eq!(playlist.song_count, 1);
1198
1199        let ctx = tarpc::context::current();
1200        let songs = client
1201            .playlist_get_songs(ctx, response.clone())
1202            .await?
1203            .unwrap();
1204        assert_eq!(songs.len(), 1);
1205        assert_eq!(songs[0].path.to_string_lossy(), "/path/to/song.mp3");
1206
1207        Ok(())
1208    }
1209
1210    #[rstest]
1211    #[tokio::test]
1212    async fn test_playlist_export(#[future] client: MusicPlayerClient) -> Result<()> {
1213        let client = client.await;
1214
1215        let tmpdir = tempfile::tempdir()?;
1216        let path = tmpdir.path().join("test.m3u");
1217
1218        let ctx = tarpc::context::current();
1219        let library_full: LibraryFull = client.library_full(ctx).await??;
1220
1221        let playlist = library_full.playlists[0].clone();
1222
1223        let response = client
1224            .playlist_export(ctx, playlist.id.clone().into(), path.clone())
1225            .await?;
1226        assert_eq!(response, Ok(()));
1227
1228        let mut file = std::fs::File::open(path.clone())?;
1229        let mut contents = String::new();
1230        file.read_to_string(&mut contents)?;
1231        assert_str_eq!(
1232            contents,
1233            r"#EXTM3U
1234
1235#PLAYLIST:Playlist 0
1236
1237#EXTINF:120,Song 0 - Artist 0
1238#EXTGENRE:Genre 0
1239#EXTALB:Artist 0
1240/path/to/song.mp3
1241
1242"
1243        );
1244
1245        Ok(())
1246    }
1247}