mecomp_daemon/
lib.rs

1#![deny(clippy::missing_inline_in_public_items)]
2
3//----------------------------------------------------------------------------------------- std lib
4use std::{
5    net::{IpAddr, Ipv4Addr},
6    sync::Arc,
7};
8//--------------------------------------------------------------------------------- other libraries
9use futures::{future, prelude::*};
10use log::{error, info};
11use surrealdb::{Surreal, engine::local::Db};
12use tarpc::{
13    self,
14    serde_transport::tcp,
15    server::{BaseChannel, Channel as _, incoming::Incoming as _},
16    tokio_serde::formats::Json,
17};
18use tokio::{runtime::Handle, sync::RwLock};
19use tracing::Instrument;
20//-------------------------------------------------------------------------------- MECOMP libraries
21use mecomp_core::{
22    audio::{AudioKernelSender, commands::AudioCommand},
23    config::Settings,
24    is_server_running,
25    logger::{init_logger, init_tracing},
26    rpc::{MusicPlayer as _, MusicPlayerClient},
27    udp::{Message, Sender, StateChange},
28};
29use mecomp_storage::db::{init_database, set_database_path};
30
31async fn spawn<F>(fut: F)
32where
33    F: Future<Output = ()> + Send + 'static,
34{
35    if size_of::<F>() > 1024 {
36        // if the future is too big, box it before spawning
37        let fut = Box::pin(fut);
38        tokio::spawn(fut);
39    } else {
40        // if the future is small enough, spawn it directly
41        tokio::spawn(fut);
42    }
43}
44
45pub mod controller;
46#[cfg(feature = "dynamic_updates")]
47pub mod dynamic_updates;
48pub mod services;
49pub mod termination;
50#[cfg(test)]
51pub use mecomp_core::test_utils;
52
53use crate::controller::MusicPlayerServer;
54
55/// The number of connections per IP address.
56const CHANNELS_PER_IP: u32 = 10;
57/// The maximum number of concurrent requests.
58pub const MAX_CONCURRENT_REQUESTS: usize = 4;
59
60/// Event Publisher guard
61///
62/// This is a newtype for the event publisher that ensures it is stopped when the guard is dropped.
63struct EventPublisher {
64    dispatcher: Arc<RwLock<Sender<Message>>>,
65    event_tx: std::sync::mpsc::Sender<StateChange>,
66    handle: tokio::task::JoinHandle<()>,
67}
68
69impl EventPublisher {
70    /// Start the event publisher
71    pub async fn new() -> Self {
72        let (event_tx, event_rx) = std::sync::mpsc::channel();
73        let event_publisher = Arc::new(RwLock::const_new(Sender::new().await.unwrap()));
74        let event_publisher_clone = event_publisher.clone();
75
76        let handle = tokio::task::spawn_blocking(move || {
77            while let Ok(event) = event_rx.recv() {
78                // re-enter the async context to send the event over UDP
79                Handle::current().block_on(async {
80                    if let Err(e) = event_publisher_clone
81                        .read()
82                        .await
83                        .send(Message::StateChange(event))
84                        .await
85                    {
86                        error!("Failed to send event over UDP: {e}");
87                    }
88                });
89            }
90        })
91        .instrument(tracing::info_span!("event_publisher"));
92
93        Self {
94            dispatcher: event_publisher,
95            event_tx,
96            handle: handle.into_inner(),
97        }
98    }
99}
100
101impl Drop for EventPublisher {
102    fn drop(&mut self) {
103        // Stop the event publisher thread
104        self.handle.abort();
105    }
106}
107
108// TODO: at some point, we should probably add a panic handler to the daemon to ensure graceful shutdown.
109
110/// Run the daemon
111///
112/// also initializes the logger, database, and other necessary components.
113///
114/// # Arguments
115///
116/// * `settings` - The settings to use.
117/// * `db_dir` - The directory where the database is stored.
118///   If the directory does not exist, it will be created.
119/// * `log_file_path` - The path to the file where logs will be written.
120///
121/// # Errors
122///
123/// If the daemon cannot be started, an error is returned.
124///
125/// # Panics
126///
127/// Panics if the peer address of the underlying TCP transport cannot be determined.
128#[inline]
129#[allow(clippy::redundant_pub_crate)]
130pub async fn start_daemon(
131    settings: Settings,
132    db_dir: std::path::PathBuf,
133    log_file_path: Option<std::path::PathBuf>,
134) -> anyhow::Result<()> {
135    // Throw the given settings into an Arc so we can share settings across threads.
136    let settings = Arc::new(settings);
137
138    // check if a server is already running
139    if is_server_running(settings.daemon.rpc_port) {
140        anyhow::bail!(
141            "A server is already running on port {}",
142            settings.daemon.rpc_port
143        );
144    }
145
146    // Initialize the logger, database, and tracing.
147    init_logger(settings.daemon.log_level, log_file_path);
148    set_database_path(db_dir)?;
149    let db = Arc::new(init_database().await?);
150    tracing::subscriber::set_global_default(init_tracing())?;
151
152    // initialize the termination handler
153    let (terminator, mut interrupt_rx) = termination::create_termination();
154
155    // Start the music library watcher.
156    #[cfg(feature = "dynamic_updates")]
157    let guard = dynamic_updates::init_music_library_watcher(
158        db.clone(),
159        &settings.daemon.library_paths,
160        settings.daemon.artist_separator.clone(),
161        settings.daemon.protected_artist_names.clone(),
162        settings.daemon.genre_separator.clone(),
163        interrupt_rx.resubscribe(),
164    )?;
165
166    // initialize the event publisher
167    let event_publisher_guard = EventPublisher::new().await;
168
169    // Start the audio kernel.
170    let audio_kernel = AudioKernelSender::start(event_publisher_guard.event_tx.clone());
171
172    // Initialize the server.
173    let server = MusicPlayerServer::new(
174        db.clone(),
175        settings.clone(),
176        audio_kernel.clone(),
177        event_publisher_guard.dispatcher.clone(),
178        terminator.clone(),
179        interrupt_rx.resubscribe(),
180    );
181
182    // Start the RPC server.
183    let server_addr = (IpAddr::V4(Ipv4Addr::LOCALHOST), settings.daemon.rpc_port);
184
185    let mut listener = match tcp::listen(&server_addr, Json::default).await {
186        Ok(listener) => listener,
187        Err(e) => {
188            error!("Failed to start server: {e}");
189
190            #[cfg(feature = "dynamic_updates")]
191            guard.stop();
192
193            return Err(anyhow::anyhow!("Failed to start server: {e}"));
194        }
195    };
196    info!("Listening on {}", listener.local_addr());
197    listener.config_mut().max_frame_length(usize::MAX);
198    let server_handle = listener
199        // Ignore accept errors.
200        .filter_map(|r| future::ready(r.ok()))
201        .map(BaseChannel::with_defaults)
202        // Limit channels to 10 per IP.
203        .max_channels_per_key(CHANNELS_PER_IP, |t| t.transport().peer_addr().unwrap().ip())
204        // Set up the server's handling of incoming connections.
205        // serve is generated by the service attribute.
206        // It takes as input any type implementing the generated MusicPlayer trait.
207        .map(|channel| channel.execute(server.clone().serve()).for_each(spawn))
208        // Max 4 channels.
209        // this means that we will only process 4 requests at a time
210        // NOTE: if we have issues with concurrency (e.g. deadlocks or data-races),
211        //       and have too much of a skill issue to fix it, we can set this number to 1.
212        .buffer_unordered(MAX_CONCURRENT_REQUESTS)
213        .for_each(async |()| {})
214        // make it fused so we can stop it later
215        .fuse();
216
217    // run the server until it is terminated
218    tokio::select! {
219        () = server_handle => {
220            error!("Server stopped unexpectedly");
221        },
222        // Wait for the server to be stopped.
223        // This will be triggered by the signal handler.
224        reason = interrupt_rx.wait() => {
225            match reason {
226                Ok(termination::Interrupted::UserInt) => info!("Stopping server per user request"),
227                Ok(termination::Interrupted::OsSigInt) => info!("Stopping server because of an os sig int"),
228                Ok(termination::Interrupted::OsSigTerm) => info!("Stopping server because of an os sig term"),
229                Ok(termination::Interrupted::OsSigQuit) => info!("Stopping server because of an os sig quit"),
230                Err(e) => error!("Stopping server because of an unexpected error: {e}"),
231            }
232        }
233    }
234
235    #[cfg(feature = "dynamic_updates")]
236    guard.stop();
237
238    // send a shutdown event to all clients (ignore errors)
239    let _ = event_publisher_guard
240        .dispatcher
241        .read()
242        .await
243        .send(Message::Event(mecomp_core::udp::Event::DaemonShutdown))
244        .await;
245
246    log::info!("Cleanup complete, exiting...");
247
248    Ok(())
249}
250
251/// Initialize a test client, sends and receives messages over a channel / pipe.
252/// This is useful for testing the server without needing to start it.
253///
254/// # Errors
255///
256/// Errors if the event publisher cannot be created.
257#[inline]
258pub async fn init_test_client_server(
259    db: Arc<Surreal<Db>>,
260    settings: Arc<Settings>,
261    audio_kernel: Arc<AudioKernelSender>,
262) -> anyhow::Result<MusicPlayerClient> {
263    let (client_transport, server_transport) = tarpc::transport::channel::unbounded();
264
265    let event_publisher = Arc::new(RwLock::new(Sender::new().await?));
266    // initialize the termination handler
267    let (terminator, mut interrupt_rx) = termination::create_termination();
268    #[allow(clippy::redundant_pub_crate)]
269    tokio::spawn(async move {
270        let server = MusicPlayerServer::new(
271            db,
272            settings,
273            audio_kernel.clone(),
274            event_publisher.clone(),
275            terminator,
276            interrupt_rx.resubscribe(),
277        );
278        tokio::select! {
279            () = tarpc::server::BaseChannel::with_defaults(server_transport)
280                .execute(server.serve())
281                // Handle all requests concurrently.
282                .for_each(async |response| {
283                    tokio::spawn(response);
284                }) => {},
285            // Wait for the server to be stopped.
286            _ = interrupt_rx.wait() => {
287                // Stop the server.
288                info!("Stopping server...");
289                audio_kernel.send(AudioCommand::Exit);
290                let _ = event_publisher.read().await.send(Message::Event(mecomp_core::udp::Event::DaemonShutdown)).await;
291                info!("Server stopped");
292            }
293        }
294    });
295
296    // MusicPlayerClient is generated by the #[tarpc::service] attribute. It has a constructor `new`
297    // that takes a config and any Transport as input.
298    Ok(MusicPlayerClient::new(tarpc::client::Config::default(), client_transport).spawn())
299}
300
301#[cfg(test)]
302mod test_client_tests {
303    //! Tests for:
304    //! - the `init_test_client_server` function
305    //! - daemon endpoints that aren't covered in other tests
306
307    use std::io::{Read, Write};
308
309    use super::*;
310    use anyhow::Result;
311    use mecomp_core::{
312        errors::{BackupError, SerializableLibraryError},
313        state::library::LibraryFull,
314    };
315    use mecomp_storage::{
316        db::schemas::{
317            collection::Collection,
318            dynamic::{DynamicPlaylist, DynamicPlaylistChangeSet, query::Query},
319            playlist::Playlist,
320            song::SongChangeSet,
321        },
322        test_utils::{SongCase, create_song_with_overrides, init_test_database},
323    };
324
325    use pretty_assertions::{assert_eq, assert_str_eq};
326    use rstest::{fixture, rstest};
327
328    #[fixture]
329    async fn db() -> Arc<Surreal<Db>> {
330        let db = Arc::new(init_test_database().await.unwrap());
331
332        // create a test song, add it to a playlist and collection
333
334        let song_case = SongCase::new(0, vec![0], vec![0], 0, 0);
335
336        // Call the create_song function
337        let song = create_song_with_overrides(
338            &db,
339            song_case,
340            SongChangeSet {
341                // need to specify overrides so that items are created in the db
342                artist: Some(one_or_many::OneOrMany::One("Artist 0".into())),
343                album_artist: Some(one_or_many::OneOrMany::One("Artist 0".into())),
344                album: Some("Album 0".into()),
345                path: Some("/path/to/song.mp3".into()),
346                ..Default::default()
347            },
348        )
349        .await
350        .unwrap();
351
352        // create a playlist with the song
353        let playlist = Playlist {
354            id: Playlist::generate_id(),
355            name: "Playlist 0".into(),
356            runtime: song.runtime,
357            song_count: 1,
358        };
359
360        let result = Playlist::create(&db, playlist).await.unwrap().unwrap();
361
362        Playlist::add_songs(&db, result.id, vec![song.id.clone()])
363            .await
364            .unwrap();
365
366        // create a collection with the song
367        let collection = Collection {
368            id: Collection::generate_id(),
369            name: "Collection 0".into(),
370            runtime: song.runtime,
371            song_count: 1,
372        };
373
374        let result = Collection::create(&db, collection).await.unwrap().unwrap();
375
376        Collection::add_songs(&db, result.id, vec![song.id])
377            .await
378            .unwrap();
379
380        return db;
381    }
382
383    #[fixture]
384    async fn client(#[future] db: Arc<Surreal<Db>>) -> MusicPlayerClient {
385        let settings = Arc::new(Settings::default());
386        let (tx, _) = std::sync::mpsc::channel();
387        let audio_kernel = AudioKernelSender::start(tx);
388
389        init_test_client_server(db.await, settings, audio_kernel)
390            .await
391            .unwrap()
392    }
393
394    #[tokio::test]
395    async fn test_init_test_client_server() {
396        let db = Arc::new(init_test_database().await.unwrap());
397        let settings = Arc::new(Settings::default());
398        let (tx, _) = std::sync::mpsc::channel();
399        let audio_kernel = AudioKernelSender::start(tx);
400
401        let client = init_test_client_server(db, settings, audio_kernel)
402            .await
403            .unwrap();
404
405        let ctx = tarpc::context::current();
406        let response = client.ping(ctx).await.unwrap();
407
408        assert_eq!(response, "pong");
409
410        // ensure that the client is shutdown properly
411        drop(client);
412    }
413
414    #[rstest]
415    #[tokio::test]
416    async fn test_library_song_get_artist(#[future] client: MusicPlayerClient) -> Result<()> {
417        let client = client.await;
418
419        let ctx = tarpc::context::current();
420        let library_full: LibraryFull = client.library_full(ctx).await??;
421
422        let ctx = tarpc::context::current();
423        let response = client
424            .library_song_get_artist(ctx, library_full.songs.first().unwrap().id.clone().into())
425            .await?;
426
427        assert_eq!(response, library_full.artists.into_vec().into());
428
429        Ok(())
430    }
431
432    #[rstest]
433    #[tokio::test]
434    async fn test_library_song_get_album(#[future] client: MusicPlayerClient) -> Result<()> {
435        let client = client.await;
436
437        let ctx = tarpc::context::current();
438        let library_full: LibraryFull = client.library_full(ctx).await??;
439
440        let ctx = tarpc::context::current();
441        let response = client
442            .library_song_get_album(ctx, library_full.songs.first().unwrap().id.clone().into())
443            .await?
444            .unwrap();
445
446        assert_eq!(response, library_full.albums.first().unwrap().clone());
447
448        Ok(())
449    }
450
451    #[rstest]
452    #[tokio::test]
453    async fn test_library_song_get_playlists(#[future] client: MusicPlayerClient) -> Result<()> {
454        let client = client.await;
455
456        let ctx = tarpc::context::current();
457        let library_full: LibraryFull = client.library_full(ctx).await??;
458
459        let ctx = tarpc::context::current();
460        let response = client
461            .library_song_get_playlists(ctx, library_full.songs.first().unwrap().id.clone().into())
462            .await?;
463
464        assert_eq!(response, library_full.playlists.into_vec().into());
465
466        Ok(())
467    }
468
469    #[rstest]
470    #[tokio::test]
471    async fn test_library_album_get_artist(#[future] client: MusicPlayerClient) -> Result<()> {
472        let client = client.await;
473
474        let ctx = tarpc::context::current();
475        let library_full: LibraryFull = client.library_full(ctx).await??;
476
477        let ctx = tarpc::context::current();
478        let response = client
479            .library_album_get_artist(ctx, library_full.albums.first().unwrap().id.clone().into())
480            .await?;
481
482        assert_eq!(response, library_full.artists.into_vec().into());
483
484        Ok(())
485    }
486
487    #[rstest]
488    #[tokio::test]
489    async fn test_library_album_get_songs(#[future] client: MusicPlayerClient) -> Result<()> {
490        let client = client.await;
491
492        let ctx = tarpc::context::current();
493        let library_full: LibraryFull = client.library_full(ctx).await??;
494
495        let ctx = tarpc::context::current();
496        let response = client
497            .library_album_get_songs(ctx, library_full.albums.first().unwrap().id.clone().into())
498            .await?
499            .unwrap();
500
501        assert_eq!(response, library_full.songs);
502
503        Ok(())
504    }
505
506    #[rstest]
507    #[tokio::test]
508    async fn test_library_artist_get_songs(#[future] client: MusicPlayerClient) -> Result<()> {
509        let client = client.await;
510
511        let ctx = tarpc::context::current();
512        let library_full: LibraryFull = client.library_full(ctx).await??;
513
514        let ctx = tarpc::context::current();
515        let response = client
516            .library_artist_get_songs(ctx, library_full.artists.first().unwrap().id.clone().into())
517            .await?
518            .unwrap();
519
520        assert_eq!(response, library_full.songs);
521
522        Ok(())
523    }
524
525    #[rstest]
526    #[tokio::test]
527    async fn test_library_artist_get_albums(#[future] client: MusicPlayerClient) -> Result<()> {
528        let client = client.await;
529
530        let ctx = tarpc::context::current();
531        let library_full: LibraryFull = client.library_full(ctx).await??;
532
533        let ctx = tarpc::context::current();
534        let response = client
535            .library_artist_get_albums(ctx, library_full.artists.first().unwrap().id.clone().into())
536            .await?
537            .unwrap();
538
539        assert_eq!(response, library_full.albums);
540
541        Ok(())
542    }
543
544    #[rstest]
545    #[tokio::test]
546    async fn test_playback_volume_toggle_mute(#[future] client: MusicPlayerClient) -> Result<()> {
547        let client = client.await;
548
549        let ctx = tarpc::context::current();
550
551        client.playback_volume_toggle_mute(ctx).await?;
552        Ok(())
553    }
554
555    #[rstest]
556    #[tokio::test]
557    async fn test_playback_stop(#[future] client: MusicPlayerClient) -> Result<()> {
558        let client = client.await;
559
560        let ctx = tarpc::context::current();
561
562        client.playback_stop(ctx).await?;
563        Ok(())
564    }
565
566    #[rstest]
567    #[tokio::test]
568    async fn test_queue_add_list(#[future] client: MusicPlayerClient) -> Result<()> {
569        let client = client.await;
570
571        let ctx = tarpc::context::current();
572        let library_full: LibraryFull = client.library_full(ctx).await??;
573
574        let ctx = tarpc::context::current();
575        let response = client
576            .queue_add_list(
577                ctx,
578                vec![library_full.songs.first().unwrap().id.clone().into()],
579            )
580            .await?;
581
582        assert_eq!(response, Ok(()));
583
584        Ok(())
585    }
586
587    #[rstest]
588    #[case::get(String::from("Playlist 0"))]
589    #[case::create(String::from("Playlist 1"))]
590    #[tokio::test]
591    async fn test_playlist_get_or_create(
592        #[future] client: MusicPlayerClient,
593        #[case] name: String,
594    ) -> Result<()> {
595        let client = client.await;
596
597        let ctx = tarpc::context::current();
598
599        // get or create the playlist
600        let playlist_id = client
601            .playlist_get_or_create(ctx, name.clone())
602            .await?
603            .unwrap();
604
605        // now get that playlist
606        let ctx = tarpc::context::current();
607        let playlist = client.playlist_get(ctx, playlist_id).await?.unwrap();
608
609        assert_eq!(playlist.name, name);
610
611        Ok(())
612    }
613
614    #[rstest]
615    #[tokio::test]
616    async fn test_playlist_clone(#[future] client: MusicPlayerClient) -> Result<()> {
617        let client = client.await;
618
619        let ctx = tarpc::context::current();
620        let library_full: LibraryFull = client.library_full(ctx).await??;
621
622        // clone the only playlist in the db
623        let ctx = tarpc::context::current();
624        let playlist_id = client
625            .playlist_clone(
626                ctx,
627                library_full.playlists.first().unwrap().id.clone().into(),
628            )
629            .await?
630            .unwrap();
631
632        // now get that playlist
633        let ctx = tarpc::context::current();
634        let playlist = client.playlist_get(ctx, playlist_id).await?.unwrap();
635
636        assert_eq!(playlist.name, "Playlist 0 (copy)");
637
638        Ok(())
639    }
640
641    #[rstest]
642    #[tokio::test]
643    async fn test_playlist_get_songs(#[future] client: MusicPlayerClient) -> Result<()> {
644        let client = client.await;
645
646        let ctx = tarpc::context::current();
647        let library_full: LibraryFull = client.library_full(ctx).await??;
648
649        // clone the only playlist in the db
650        let response = client
651            .playlist_get_songs(
652                ctx,
653                library_full.playlists.first().unwrap().id.clone().into(),
654            )
655            .await?
656            .unwrap();
657
658        assert_eq!(response, library_full.songs);
659
660        Ok(())
661    }
662
663    #[rstest]
664    #[tokio::test]
665    async fn test_playlist_rename(#[future] client: MusicPlayerClient) -> Result<()> {
666        let client = client.await;
667
668        let ctx = tarpc::context::current();
669        let library_full: LibraryFull = client.library_full(ctx).await??;
670
671        let target = library_full.playlists.first().unwrap();
672
673        let ctx = tarpc::context::current();
674        let response = client
675            .playlist_rename(ctx, target.id.clone().into(), "New Name".into())
676            .await?;
677
678        let expected = Playlist {
679            name: "New Name".into(),
680            ..target.clone()
681        };
682
683        assert_eq!(response, Ok(expected.clone()));
684
685        let ctx = tarpc::context::current();
686        let response = client
687            .playlist_get(ctx, target.id.clone().into())
688            .await?
689            .unwrap();
690
691        assert_eq!(response, expected);
692        Ok(())
693    }
694
695    #[rstest]
696    #[tokio::test]
697    async fn test_collection_get_songs(#[future] client: MusicPlayerClient) -> Result<()> {
698        let client = client.await;
699
700        let ctx = tarpc::context::current();
701        let library_full: LibraryFull = client.library_full(ctx).await??;
702
703        // clone the only playlist in the db
704        let response = client
705            .collection_get_songs(
706                ctx,
707                library_full.collections.first().unwrap().id.clone().into(),
708            )
709            .await?
710            .unwrap();
711
712        assert_eq!(response, library_full.songs);
713
714        Ok(())
715    }
716
717    #[rstest]
718    #[tokio::test]
719    async fn test_dynamic_playlist_create(#[future] client: MusicPlayerClient) -> Result<()> {
720        let client = client.await;
721
722        let ctx = tarpc::context::current();
723
724        let query: Query = "artist CONTAINS \"Artist 0\"".parse()?;
725
726        let response = client
727            .dynamic_playlist_create(ctx, "Dynamic Playlist 0".into(), query)
728            .await?;
729
730        assert!(response.is_ok());
731
732        Ok(())
733    }
734
735    #[rstest]
736    #[tokio::test]
737    async fn test_dynamic_playlist_list(#[future] client: MusicPlayerClient) -> Result<()> {
738        let client = client.await;
739
740        let ctx = tarpc::context::current();
741
742        let query: Query = "artist CONTAINS \"Artist 0\"".parse()?;
743
744        let dynamic_playlist_id = client
745            .dynamic_playlist_create(ctx, "Dynamic Playlist 0".into(), query)
746            .await?
747            .unwrap();
748
749        let ctx = tarpc::context::current();
750        let response = client.dynamic_playlist_list(ctx).await?;
751
752        assert_eq!(response.len(), 1);
753        assert_eq!(response.first().unwrap().id, dynamic_playlist_id.into());
754
755        Ok(())
756    }
757
758    #[rstest]
759    #[tokio::test]
760    async fn test_dynamic_playlist_update(#[future] client: MusicPlayerClient) -> Result<()> {
761        let client = client.await;
762
763        let ctx = tarpc::context::current();
764
765        let query: Query = "artist CONTAINS \"Artist 0\"".parse()?;
766
767        let dynamic_playlist_id = client
768            .dynamic_playlist_create(ctx, "Dynamic Playlist 0".into(), query.clone())
769            .await?
770            .unwrap();
771
772        let ctx = tarpc::context::current();
773        let response = client
774            .dynamic_playlist_update(
775                ctx,
776                dynamic_playlist_id.clone(),
777                DynamicPlaylistChangeSet::new().name("Dynamic Playlist 1"),
778            )
779            .await?;
780
781        let expected = DynamicPlaylist {
782            id: dynamic_playlist_id.clone().into(),
783            name: "Dynamic Playlist 1".into(),
784            query: query.clone(),
785        };
786
787        assert_eq!(response, Ok(expected.clone()));
788
789        let ctx = tarpc::context::current();
790        let response = client
791            .dynamic_playlist_get(ctx, dynamic_playlist_id)
792            .await?
793            .unwrap();
794
795        assert_eq!(response, expected);
796
797        Ok(())
798    }
799
800    #[rstest]
801    #[tokio::test]
802    async fn test_dynamic_playlist_remove(#[future] client: MusicPlayerClient) -> Result<()> {
803        let client = client.await;
804
805        let ctx = tarpc::context::current();
806
807        let query: Query = "artist CONTAINS \"Artist 0\"".parse()?;
808
809        let dynamic_playlist_id = client
810            .dynamic_playlist_create(ctx, "Dynamic Playlist 0".into(), query)
811            .await?
812            .unwrap();
813
814        let ctx = tarpc::context::current();
815        let response = client
816            .dynamic_playlist_remove(ctx, dynamic_playlist_id)
817            .await?;
818
819        assert_eq!(response, Ok(()));
820
821        let ctx = tarpc::context::current();
822        let response = client.dynamic_playlist_list(ctx).await?;
823
824        assert_eq!(response.len(), 0);
825
826        Ok(())
827    }
828
829    #[rstest]
830    #[tokio::test]
831    async fn test_dynamic_playlist_get(#[future] client: MusicPlayerClient) -> Result<()> {
832        let client = client.await;
833
834        let ctx = tarpc::context::current();
835
836        let query: Query = "artist CONTAINS \"Artist 0\"".parse()?;
837
838        let dynamic_playlist_id = client
839            .dynamic_playlist_create(ctx, "Dynamic Playlist 0".into(), query.clone())
840            .await?
841            .unwrap();
842
843        let ctx = tarpc::context::current();
844        let response = client
845            .dynamic_playlist_get(ctx, dynamic_playlist_id)
846            .await?
847            .unwrap();
848
849        assert_eq!(response.name, "Dynamic Playlist 0");
850        assert_eq!(response.query, query);
851
852        Ok(())
853    }
854
855    #[rstest]
856    #[tokio::test]
857    async fn test_dynamic_playlist_get_songs(#[future] client: MusicPlayerClient) -> Result<()> {
858        let client = client.await;
859
860        let ctx = tarpc::context::current();
861
862        let query: Query = "artist CONTAINS \"Artist 0\"".parse()?;
863
864        let dynamic_playlist_id = client
865            .dynamic_playlist_create(ctx, "Dynamic Playlist 0".into(), query)
866            .await?
867            .unwrap();
868
869        let ctx = tarpc::context::current();
870        let response = client
871            .dynamic_playlist_get_songs(ctx, dynamic_playlist_id)
872            .await?
873            .unwrap();
874
875        assert_eq!(response.len(), 1);
876
877        Ok(())
878    }
879
880    // Dynamic Playlist Import Tests
881    #[rstest]
882    #[tokio::test]
883    async fn test_dynamic_playlist_import(#[future] client: MusicPlayerClient) -> Result<()> {
884        let client = client.await;
885
886        let tmpfile = tempfile::NamedTempFile::with_suffix("dps.csv")?;
887
888        // write a csv file to the temp file
889        let mut file = tmpfile.reopen()?;
890        writeln!(file, "dynamic playlist name,query")?;
891        writeln!(file, "Dynamic Playlist 0,artist CONTAINS \"Artist 0\"")?;
892
893        let tmpfile_path = tmpfile.path().to_path_buf();
894
895        let query: Query = "artist CONTAINS \"Artist 0\"".parse()?;
896
897        let ctx = tarpc::context::current();
898        let response = client.dynamic_playlist_import(ctx, tmpfile_path).await??;
899
900        let expected = DynamicPlaylist {
901            id: response[0].id.clone(),
902            name: "Dynamic Playlist 0".into(),
903            query: query.clone(),
904        };
905
906        assert_eq!(response, vec![expected]);
907
908        Ok(())
909    }
910    #[rstest]
911    #[tokio::test]
912    async fn test_dynamic_playlist_import_file_nonexistent(
913        #[future] client: MusicPlayerClient,
914    ) -> Result<()> {
915        let client = client.await;
916
917        let tmpfile = tempfile::NamedTempFile::with_suffix("dps.csv")?;
918
919        // write a csv file to the temp file
920        let mut file = tmpfile.reopen()?;
921        writeln!(file, "artist,album,album_artist,title")?;
922
923        let tmpfile_path = "/this/path/does/not/exist.csv";
924
925        let ctx = tarpc::context::current();
926        let response = client
927            .dynamic_playlist_import(ctx, tmpfile_path.into())
928            .await?;
929        assert!(response.is_err(), "response: {response:?}");
930        assert_eq!(
931            response.unwrap_err().to_string(),
932            format!("Backup Error: The file \"{tmpfile_path}\" does not exist")
933        );
934        Ok(())
935    }
936    #[rstest]
937    #[tokio::test]
938    async fn test_dynamic_playlist_import_file_wrong_extension(
939        #[future] client: MusicPlayerClient,
940    ) -> Result<()> {
941        let client = client.await;
942
943        let tmpfile = tempfile::NamedTempFile::with_suffix("dps.txt")?;
944
945        // write a csv file to the temp file
946        let mut file = tmpfile.reopen()?;
947        writeln!(file, "artist,album,album_artist,title")?;
948
949        let tmpfile_path = tmpfile.path().to_path_buf();
950
951        let ctx = tarpc::context::current();
952        let response = client
953            .dynamic_playlist_import(ctx, tmpfile_path.clone())
954            .await?;
955        assert!(response.is_err(), "response: {response:?}");
956        assert_eq!(
957            response.unwrap_err().to_string(),
958            format!(
959                "Backup Error: The file \"{}\" has the wrong extension, expected: csv",
960                tmpfile_path.display()
961            )
962        );
963        Ok(())
964    }
965    #[rstest]
966    #[tokio::test]
967    async fn test_dynamic_playlist_import_file_is_directory(
968        #[future] client: MusicPlayerClient,
969    ) -> Result<()> {
970        let client = client.await;
971
972        let tmpfile = tempfile::tempdir()?;
973
974        let tmpfile_path = tmpfile.path().to_path_buf();
975
976        let ctx = tarpc::context::current();
977        let response = client
978            .dynamic_playlist_import(ctx, tmpfile_path.clone())
979            .await?;
980        assert!(response.is_err());
981        assert_eq!(
982            response.unwrap_err().to_string(),
983            format!(
984                "Backup Error: {} is a directory, not a file",
985                tmpfile_path.display()
986            )
987        );
988        Ok(())
989    }
990    #[rstest]
991    #[tokio::test]
992    async fn test_dynamic_playlist_import_file_invalid_format(
993        #[future] client: MusicPlayerClient,
994    ) -> Result<()> {
995        let client = client.await;
996
997        let tmpfile = tempfile::NamedTempFile::with_suffix("dps.csv")?;
998
999        // write a csv file to the temp file
1000        let mut file = tmpfile.reopen()?;
1001        writeln!(file, "artist,album,album_artist,title")?;
1002
1003        let tmpfile_path = tmpfile.path().to_path_buf();
1004
1005        let ctx = tarpc::context::current();
1006        let response = client.dynamic_playlist_import(ctx, tmpfile_path).await?;
1007        assert!(response.is_err());
1008        assert_eq!(
1009            response.unwrap_err().to_string(),
1010            "Backup Error: No valid playlists were found in the csv file."
1011        );
1012        Ok(())
1013    }
1014    #[rstest]
1015    #[tokio::test]
1016    async fn test_dynamic_playlist_import_file_invalid_query(
1017        #[future] client: MusicPlayerClient,
1018    ) -> Result<()> {
1019        let client = client.await;
1020
1021        let tmpfile = tempfile::NamedTempFile::with_suffix("dps.csv")?;
1022
1023        // write a csv file to the temp file
1024        let mut file = tmpfile.reopen()?;
1025        writeln!(file, "dynamic playlist name,query")?;
1026        writeln!(file, "Dynamic Playlist 0,artist CONTAINS \"Artist 0\"")?;
1027        writeln!(file, "Dynamic Playlist 1,artist CONTAINS \"")?;
1028
1029        let tmpfile_path = tmpfile.path().to_path_buf();
1030
1031        let ctx = tarpc::context::current();
1032        let response = client.dynamic_playlist_import(ctx, tmpfile_path).await?;
1033        assert!(
1034            matches!(
1035                response,
1036                Err(SerializableLibraryError::BackupError(
1037                    BackupError::InvalidDynamicPlaylistQuery(_, 2)
1038                ))
1039            ),
1040            "response: {response:?}"
1041        );
1042        Ok(())
1043    }
1044
1045    // Dynamic Playlist Export Tests
1046    #[rstest]
1047    #[tokio::test]
1048    async fn test_dynamic_playlist_export(#[future] client: MusicPlayerClient) -> Result<()> {
1049        let client = client.await;
1050
1051        let tmpdir = tempfile::tempdir()?;
1052        let path = tmpdir.path().join("test.csv");
1053
1054        let query: Query = "artist CONTAINS \"Artist 0\"".parse()?;
1055        let ctx = tarpc::context::current();
1056        let _ = client
1057            .dynamic_playlist_create(ctx, "Dynamic Playlist 0".into(), query.clone())
1058            .await?
1059            .unwrap();
1060
1061        let expected = r#"dynamic playlist name,query
1062Dynamic Playlist 0,"artist CONTAINS ""Artist 0"""
1063"#;
1064
1065        let response = client.dynamic_playlist_export(ctx, path.clone()).await?;
1066        assert_eq!(response, Ok(()));
1067
1068        let mut file = std::fs::File::open(path.clone())?;
1069        let mut contents = String::new();
1070        file.read_to_string(&mut contents)?;
1071        assert_str_eq!(contents, expected);
1072
1073        Ok(())
1074    }
1075    #[rstest]
1076    #[tokio::test]
1077    async fn test_dynamic_playlist_export_file_exists(
1078        #[future] client: MusicPlayerClient,
1079    ) -> Result<()> {
1080        let client = client.await;
1081
1082        let tmpfile = tempfile::NamedTempFile::with_suffix("dps.csv")?;
1083
1084        let ctx = tarpc::context::current();
1085        let response = client
1086            .dynamic_playlist_export(ctx, tmpfile.path().to_path_buf())
1087            .await?;
1088        assert!(matches!(response, Ok(())), "response: {response:?}");
1089        Ok(())
1090    }
1091    #[rstest]
1092    #[tokio::test]
1093    async fn test_dynamic_playlist_export_file_is_directory(
1094        #[future] client: MusicPlayerClient,
1095    ) -> Result<()> {
1096        let client = client.await;
1097
1098        let tmpfile = tempfile::tempdir()?;
1099
1100        let ctx = tarpc::context::current();
1101        let response = client
1102            .dynamic_playlist_export(ctx, tmpfile.path().to_path_buf())
1103            .await?;
1104        assert!(
1105            matches!(
1106                response,
1107                Err(SerializableLibraryError::BackupError(
1108                    BackupError::PathIsDirectory(_)
1109                ))
1110            ),
1111            "response: {response:?}"
1112        );
1113        Ok(())
1114    }
1115    #[rstest]
1116    #[tokio::test]
1117    async fn test_dynamic_playlist_export_file_invalid_extension(
1118        #[future] client: MusicPlayerClient,
1119    ) -> Result<()> {
1120        let client = client.await;
1121
1122        let tmpfile = tempfile::NamedTempFile::with_suffix("dps.txt")?;
1123
1124        let ctx = tarpc::context::current();
1125        let response = client
1126            .dynamic_playlist_export(ctx, tmpfile.path().to_path_buf())
1127            .await?;
1128        assert!(response.is_err(), "response: {response:?}");
1129        let err = response.unwrap_err();
1130        assert!(
1131            matches!(
1132                &err,
1133                SerializableLibraryError::BackupError(
1134                    BackupError::WrongExtension(_, expected_extension)
1135                ) if expected_extension == "csv"
1136            ),
1137            "response: {err:?}"
1138        );
1139
1140        Ok(())
1141    }
1142
1143    // Playlist import test
1144    #[rstest]
1145    #[tokio::test]
1146    async fn test_playlist_import(#[future] client: MusicPlayerClient) -> Result<()> {
1147        let client = client.await;
1148
1149        let tmpfile = tempfile::NamedTempFile::with_suffix("pl.m3u")?;
1150
1151        // write a csv file to the temp file
1152        let mut file = tmpfile.reopen()?;
1153        write!(
1154            file,
1155            r"#EXTM3U
1156#EXTINF:123,Sample Artist - Sample title
1157/path/to/song.mp3
1158"
1159        )?;
1160
1161        let tmpfile_path = tmpfile.path().to_path_buf();
1162
1163        let ctx = tarpc::context::current();
1164        let response = client.playlist_import(ctx, tmpfile_path, None).await?;
1165        assert!(response.is_ok());
1166        let response = response.unwrap();
1167
1168        let ctx = tarpc::context::current();
1169        let playlist = client.playlist_get(ctx, response.clone()).await?.unwrap();
1170
1171        assert_eq!(playlist.name, "Imported Playlist");
1172        assert_eq!(playlist.song_count, 1);
1173
1174        let ctx = tarpc::context::current();
1175        let songs = client
1176            .playlist_get_songs(ctx, response.clone())
1177            .await?
1178            .unwrap();
1179        assert_eq!(songs.len(), 1);
1180        assert_eq!(songs[0].path.to_string_lossy(), "/path/to/song.mp3");
1181
1182        Ok(())
1183    }
1184
1185    #[rstest]
1186    #[tokio::test]
1187    async fn test_playlist_export(#[future] client: MusicPlayerClient) -> Result<()> {
1188        let client = client.await;
1189
1190        let tmpdir = tempfile::tempdir()?;
1191        let path = tmpdir.path().join("test.m3u");
1192
1193        let ctx = tarpc::context::current();
1194        let library_full: LibraryFull = client.library_full(ctx).await??;
1195
1196        let playlist = library_full.playlists[0].clone();
1197
1198        let response = client
1199            .playlist_export(ctx, playlist.id.clone().into(), path.clone())
1200            .await?;
1201        assert_eq!(response, Ok(()));
1202
1203        let mut file = std::fs::File::open(path.clone())?;
1204        let mut contents = String::new();
1205        file.read_to_string(&mut contents)?;
1206        assert_str_eq!(
1207            contents,
1208            r"#EXTM3U
1209
1210#PLAYLIST:Playlist 0
1211
1212#EXTINF:120,Song 0 - Artist 0
1213#EXTGENRE:Genre 0
1214#EXTALB:Artist 0
1215/path/to/song.mp3
1216
1217"
1218        );
1219
1220        Ok(())
1221    }
1222}