mecomp_daemon/
lib.rs

1#![deny(clippy::missing_inline_in_public_items)]
2
3//----------------------------------------------------------------------------------------- std lib
4use std::{
5    net::{IpAddr, Ipv4Addr},
6    path::PathBuf,
7    sync::Arc,
8};
9//--------------------------------------------------------------------------------- other libraries
10use futures::{future, prelude::*};
11use log::{error, info};
12use persistence::QueueState;
13use surrealdb::{Surreal, engine::local::Db};
14use tarpc::{
15    self,
16    serde_transport::tcp,
17    server::{BaseChannel, Channel as _, incoming::Incoming as _},
18    tokio_serde::formats::Json,
19};
20use tokio::{runtime::Handle, sync::RwLock};
21use tracing::Instrument;
22//-------------------------------------------------------------------------------- MECOMP libraries
23use mecomp_core::{
24    audio::{AudioKernelSender, commands::AudioCommand},
25    config::Settings,
26    is_server_running,
27    logger::{init_logger, init_tracing},
28    rpc::{MusicPlayer as _, MusicPlayerClient},
29    udp::{Message, Sender, StateChange},
30};
31use mecomp_storage::db::{init_database, set_database_path};
32
33async fn spawn<F>(fut: F)
34where
35    F: Future<Output = ()> + Send + 'static,
36{
37    if size_of::<F>() > 1024 {
38        // if the future is too big, box it before spawning
39        let fut = Box::pin(fut);
40        tokio::spawn(fut);
41    } else {
42        // if the future is small enough, spawn it directly
43        tokio::spawn(fut);
44    }
45}
46
47pub mod controller;
48#[cfg(feature = "dynamic_updates")]
49pub mod dynamic_updates;
50pub mod persistence;
51pub mod services;
52pub mod termination;
53#[cfg(test)]
54pub use mecomp_core::test_utils;
55
56use crate::{controller::MusicPlayerServer, termination::InterruptReceiver};
57
58/// The number of connections per IP address.
59const CHANNELS_PER_IP: u32 = 10;
60/// The maximum number of concurrent requests.
61pub const MAX_CONCURRENT_REQUESTS: usize = 4;
62
63/// Event Publisher guard
64///
65/// This is a newtype for the event publisher that ensures it is stopped when the guard is dropped.
66struct EventPublisher {
67    dispatcher: Arc<RwLock<Sender<Message>>>,
68    event_tx: std::sync::mpsc::Sender<StateChange>,
69    handle: tokio::task::JoinHandle<()>,
70}
71
72impl EventPublisher {
73    /// Start the event publisher
74    pub async fn new() -> Self {
75        let (event_tx, event_rx) = std::sync::mpsc::channel();
76        let event_publisher = Arc::new(RwLock::const_new(Sender::new().await.unwrap()));
77        let event_publisher_clone = event_publisher.clone();
78
79        let handle = tokio::task::spawn_blocking(move || {
80            while let Ok(event) = event_rx.recv() {
81                // re-enter the async context to send the event over UDP
82                Handle::current().block_on(async {
83                    if let Err(e) = event_publisher_clone
84                        .read()
85                        .await
86                        .send(Message::StateChange(event))
87                        .await
88                    {
89                        error!("Failed to send event over UDP: {e}");
90                    }
91                });
92            }
93        })
94        .instrument(tracing::info_span!("event_publisher"));
95
96        Self {
97            dispatcher: event_publisher,
98            event_tx,
99            handle: handle.into_inner(),
100        }
101    }
102}
103
104impl Drop for EventPublisher {
105    fn drop(&mut self) {
106        // Stop the event publisher thread
107        self.handle.abort();
108    }
109}
110
111// TODO: at some point, we should probably add a panic handler to the daemon to ensure graceful shutdown.
112
113/// Run the daemon
114///
115/// also initializes the logger, database, and other necessary components.
116///
117/// # Arguments
118///
119/// * `settings` - The settings to use.
120/// * `db_dir` - The directory where the database is stored.
121///   If the directory does not exist, it will be created.
122/// * `log_file_path` - The path to the file where logs will be written.
123/// * `state_file_path` - The path to the file where the queue state restored from / saved to.
124///
125/// # Errors
126///
127/// If the daemon cannot be started, an error is returned.
128///
129/// # Panics
130///
131/// Panics if the peer address of the underlying TCP transport cannot be determined.
132#[inline]
133#[allow(clippy::redundant_pub_crate)]
134pub async fn start_daemon(
135    settings: Settings,
136    db_dir: PathBuf,
137    log_file_path: Option<PathBuf>,
138    state_file_path: Option<PathBuf>,
139) -> anyhow::Result<()> {
140    // Throw the given settings into an Arc so we can share settings across threads.
141    let settings = Arc::new(settings);
142
143    // check if a server is already running
144    if is_server_running(settings.daemon.rpc_port) {
145        anyhow::bail!(
146            "A server is already running on port {}",
147            settings.daemon.rpc_port
148        );
149    }
150
151    // Initialize the logger, database, and tracing.
152    init_logger(settings.daemon.log_level, log_file_path);
153    set_database_path(db_dir)?;
154    let db = Arc::new(init_database().await?);
155    tracing::subscriber::set_global_default(init_tracing())?;
156
157    // initialize the termination handler
158    let (terminator, interrupt_rx) = termination::create_termination();
159
160    // Start the music library watcher.
161    #[cfg(feature = "dynamic_updates")]
162    let guard = dynamic_updates::init_music_library_watcher(
163        db.clone(),
164        &settings.daemon.library_paths,
165        settings.daemon.artist_separator.clone(),
166        settings.daemon.protected_artist_names.clone(),
167        settings.daemon.genre_separator.clone(),
168        interrupt_rx.resubscribe(),
169    )?;
170
171    // initialize the event publisher
172    let event_publisher_guard = EventPublisher::new().await;
173
174    // Start the audio kernel.
175    let audio_kernel = AudioKernelSender::start(event_publisher_guard.event_tx.clone());
176
177    // optionally restore the queue state
178    if let Some(state_path) = &state_file_path {
179        info!("Restoring queue state from {}", state_path.display());
180        if let Err(e) =
181            QueueState::load_from_file(state_path).map(|state| state.restore_to(&audio_kernel))
182        {
183            error!("Failed to restore queue state: {e}");
184        }
185    }
186
187    // Initialize the server.
188    let server = MusicPlayerServer::new(
189        db.clone(),
190        settings.clone(),
191        audio_kernel.clone(),
192        event_publisher_guard.dispatcher.clone(),
193        terminator.clone(),
194        interrupt_rx.resubscribe(),
195    );
196
197    // Start the daemon server.
198    if let Err(e) = run_daemon(server, settings, interrupt_rx.resubscribe()).await {
199        error!("Error running daemon: {e}");
200    }
201
202    #[cfg(feature = "dynamic_updates")]
203    guard.stop();
204
205    // send a shutdown event to all clients (ignore errors)
206    let _ = event_publisher_guard
207        .dispatcher
208        .read()
209        .await
210        .send(Message::Event(mecomp_core::udp::Event::DaemonShutdown))
211        .await;
212
213    if let Some(state_path) = &state_file_path {
214        info!("Persisting queue state to {}", state_path.display());
215        if let Err(e) = QueueState::retrieve(&audio_kernel)
216            .await
217            .and_then(|state| state.save_to_file(state_path))
218        {
219            error!("Failed to persist queue state: {e}");
220        }
221    }
222
223    log::info!("Cleanup complete, exiting...");
224
225    Ok(())
226}
227
228/// Run the daemon with the given settings, database, and state file path.
229/// Does not handle setup or teardown, just runs the server.
230async fn run_daemon(
231    server: MusicPlayerServer,
232    settings: Arc<Settings>,
233    mut interrupt_rx: InterruptReceiver,
234) -> anyhow::Result<()> {
235    // Start the RPC server.
236    let server_addr = (IpAddr::V4(Ipv4Addr::LOCALHOST), settings.daemon.rpc_port);
237
238    let mut listener = match tcp::listen(&server_addr, Json::default).await {
239        Ok(listener) => listener,
240        Err(e) => {
241            error!("Failed to start server: {e}");
242            return Err(anyhow::anyhow!("Failed to start server: {e}"));
243        }
244    };
245    info!("Listening on {}", listener.local_addr());
246    listener.config_mut().max_frame_length(usize::MAX);
247    let server_handle = listener
248        // Ignore accept errors.
249        .filter_map(|r| future::ready(r.ok()))
250        .map(BaseChannel::with_defaults)
251        // Limit channels to 10 per IP.
252        .max_channels_per_key(CHANNELS_PER_IP, |t| t.transport().peer_addr().unwrap().ip())
253        // Set up the server's handling of incoming connections.
254        // serve is generated by the service attribute.
255        // It takes as input any type implementing the generated MusicPlayer trait.
256        .map(|channel| channel.execute(server.clone().serve()).for_each(spawn))
257        // Max 4 channels.
258        // this means that we will only process 4 requests at a time
259        // NOTE: if we have issues with concurrency (e.g. deadlocks or data-races),
260        //       and have too much of a skill issue to fix it, we can set this number to 1.
261        .buffer_unordered(MAX_CONCURRENT_REQUESTS)
262        .for_each(async |()| {})
263        // make it fused so we can stop it later
264        .fuse();
265
266    // run the server until it is terminated
267    tokio::select! {
268        () = server_handle => {
269            error!("Server stopped unexpectedly");
270        },
271        // Wait for the server to be stopped.
272        // This will be triggered by the signal handler.
273        reason = interrupt_rx.wait() => {
274            match reason {
275                Ok(termination::Interrupted::UserInt) => info!("Stopping server per user request"),
276                Ok(termination::Interrupted::OsSigInt) => info!("Stopping server because of an os sig int"),
277                Ok(termination::Interrupted::OsSigTerm) => info!("Stopping server because of an os sig term"),
278                Ok(termination::Interrupted::OsSigQuit) => info!("Stopping server because of an os sig quit"),
279                Err(e) => error!("Stopping server because of an unexpected error: {e}"),
280            }
281        }
282    }
283
284    Ok(())
285}
286
287/// Initialize a test client, sends and receives messages over a channel / pipe.
288/// This is useful for testing the server without needing to start it.
289///
290/// # Errors
291///
292/// Errors if the event publisher cannot be created.
293#[inline]
294pub async fn init_test_client_server(
295    db: Arc<Surreal<Db>>,
296    settings: Arc<Settings>,
297    audio_kernel: Arc<AudioKernelSender>,
298) -> anyhow::Result<MusicPlayerClient> {
299    let (client_transport, server_transport) = tarpc::transport::channel::unbounded();
300
301    let event_publisher = Arc::new(RwLock::new(Sender::new().await?));
302    // initialize the termination handler
303    let (terminator, mut interrupt_rx) = termination::create_termination();
304    #[allow(clippy::redundant_pub_crate)]
305    tokio::spawn(async move {
306        let server = MusicPlayerServer::new(
307            db,
308            settings,
309            audio_kernel.clone(),
310            event_publisher.clone(),
311            terminator,
312            interrupt_rx.resubscribe(),
313        );
314        tokio::select! {
315            () = tarpc::server::BaseChannel::with_defaults(server_transport)
316                .execute(server.serve())
317                // Handle all requests concurrently.
318                .for_each(async |response| {
319                    tokio::spawn(response);
320                }) => {},
321            // Wait for the server to be stopped.
322            _ = interrupt_rx.wait() => {
323                // Stop the server.
324                info!("Stopping server...");
325                audio_kernel.send(AudioCommand::Exit);
326                let _ = event_publisher.read().await.send(Message::Event(mecomp_core::udp::Event::DaemonShutdown)).await;
327                info!("Server stopped");
328            }
329        }
330    });
331
332    // MusicPlayerClient is generated by the #[tarpc::service] attribute. It has a constructor `new`
333    // that takes a config and any Transport as input.
334    Ok(MusicPlayerClient::new(tarpc::client::Config::default(), client_transport).spawn())
335}
336
337#[cfg(test)]
338mod test_client_tests {
339    //! Tests for:
340    //! - the `init_test_client_server` function
341    //! - daemon endpoints that aren't covered in other tests
342
343    use std::io::{Read, Write};
344
345    use super::*;
346    use anyhow::Result;
347    use mecomp_core::{
348        errors::{BackupError, SerializableLibraryError},
349        state::library::LibraryFull,
350    };
351    use mecomp_storage::{
352        db::schemas::{
353            collection::Collection,
354            dynamic::{DynamicPlaylist, DynamicPlaylistChangeSet, query::Query},
355            playlist::Playlist,
356            song::SongChangeSet,
357        },
358        test_utils::{SongCase, create_song_with_overrides, init_test_database},
359    };
360
361    use pretty_assertions::{assert_eq, assert_str_eq};
362    use rstest::{fixture, rstest};
363
364    #[fixture]
365    async fn db() -> Arc<Surreal<Db>> {
366        let db = Arc::new(init_test_database().await.unwrap());
367
368        // create a test song, add it to a playlist and collection
369
370        let song_case = SongCase::new(0, vec![0], vec![0], 0, 0);
371
372        // Call the create_song function
373        let song = create_song_with_overrides(
374            &db,
375            song_case,
376            SongChangeSet {
377                // need to specify overrides so that items are created in the db
378                artist: Some("Artist 0".to_string().into()),
379                album_artist: Some("Artist 0".to_string().into()),
380                album: Some("Album 0".into()),
381                path: Some("/path/to/song.mp3".into()),
382                ..Default::default()
383            },
384        )
385        .await
386        .unwrap();
387
388        // create a playlist with the song
389        let playlist = Playlist {
390            id: Playlist::generate_id(),
391            name: "Playlist 0".into(),
392            runtime: song.runtime,
393            song_count: 1,
394        };
395
396        let result = Playlist::create(&db, playlist).await.unwrap().unwrap();
397
398        Playlist::add_songs(&db, result.id, vec![song.id.clone()])
399            .await
400            .unwrap();
401
402        // create a collection with the song
403        let collection = Collection {
404            id: Collection::generate_id(),
405            name: "Collection 0".into(),
406            runtime: song.runtime,
407            song_count: 1,
408        };
409
410        let result = Collection::create(&db, collection).await.unwrap().unwrap();
411
412        Collection::add_songs(&db, result.id, vec![song.id])
413            .await
414            .unwrap();
415
416        return db;
417    }
418
419    #[fixture]
420    async fn client(#[future] db: Arc<Surreal<Db>>) -> MusicPlayerClient {
421        let settings = Arc::new(Settings::default());
422        let (tx, _) = std::sync::mpsc::channel();
423        let audio_kernel = AudioKernelSender::start(tx);
424
425        init_test_client_server(db.await, settings, audio_kernel)
426            .await
427            .unwrap()
428    }
429
430    #[tokio::test]
431    async fn test_init_test_client_server() {
432        let db = Arc::new(init_test_database().await.unwrap());
433        let settings = Arc::new(Settings::default());
434        let (tx, _) = std::sync::mpsc::channel();
435        let audio_kernel = AudioKernelSender::start(tx);
436
437        let client = init_test_client_server(db, settings, audio_kernel)
438            .await
439            .unwrap();
440
441        let ctx = tarpc::context::current();
442        let response = client.ping(ctx).await.unwrap();
443
444        assert_eq!(response, "pong");
445
446        // ensure that the client is shutdown properly
447        drop(client);
448    }
449
450    #[rstest]
451    #[tokio::test]
452    async fn test_library_song_get_artist(#[future] client: MusicPlayerClient) -> Result<()> {
453        let client = client.await;
454
455        let ctx = tarpc::context::current();
456        let library_full: LibraryFull = client.library_full(ctx).await??;
457
458        let ctx = tarpc::context::current();
459        let response = client
460            .library_song_get_artist(ctx, library_full.songs.first().unwrap().id.clone().into())
461            .await?;
462
463        assert_eq!(response, library_full.artists.into_vec().into());
464
465        Ok(())
466    }
467
468    #[rstest]
469    #[tokio::test]
470    async fn test_library_song_get_album(#[future] client: MusicPlayerClient) -> Result<()> {
471        let client = client.await;
472
473        let ctx = tarpc::context::current();
474        let library_full: LibraryFull = client.library_full(ctx).await??;
475
476        let ctx = tarpc::context::current();
477        let response = client
478            .library_song_get_album(ctx, library_full.songs.first().unwrap().id.clone().into())
479            .await?
480            .unwrap();
481
482        assert_eq!(response, library_full.albums.first().unwrap().clone());
483
484        Ok(())
485    }
486
487    #[rstest]
488    #[tokio::test]
489    async fn test_library_song_get_playlists(#[future] client: MusicPlayerClient) -> Result<()> {
490        let client = client.await;
491
492        let ctx = tarpc::context::current();
493        let library_full: LibraryFull = client.library_full(ctx).await??;
494
495        let ctx = tarpc::context::current();
496        let response = client
497            .library_song_get_playlists(ctx, library_full.songs.first().unwrap().id.clone().into())
498            .await?;
499
500        assert_eq!(response, library_full.playlists.into_vec().into());
501
502        Ok(())
503    }
504
505    #[rstest]
506    #[tokio::test]
507    async fn test_library_album_get_artist(#[future] client: MusicPlayerClient) -> Result<()> {
508        let client = client.await;
509
510        let ctx = tarpc::context::current();
511        let library_full: LibraryFull = client.library_full(ctx).await??;
512
513        let ctx = tarpc::context::current();
514        let response = client
515            .library_album_get_artist(ctx, library_full.albums.first().unwrap().id.clone().into())
516            .await?;
517
518        assert_eq!(response, library_full.artists.into_vec().into());
519
520        Ok(())
521    }
522
523    #[rstest]
524    #[tokio::test]
525    async fn test_library_album_get_songs(#[future] client: MusicPlayerClient) -> Result<()> {
526        let client = client.await;
527
528        let ctx = tarpc::context::current();
529        let library_full: LibraryFull = client.library_full(ctx).await??;
530
531        let ctx = tarpc::context::current();
532        let response = client
533            .library_album_get_songs(ctx, library_full.albums.first().unwrap().id.clone().into())
534            .await?
535            .unwrap();
536
537        assert_eq!(response, library_full.songs);
538
539        Ok(())
540    }
541
542    #[rstest]
543    #[tokio::test]
544    async fn test_library_artist_get_songs(#[future] client: MusicPlayerClient) -> Result<()> {
545        let client = client.await;
546
547        let ctx = tarpc::context::current();
548        let library_full: LibraryFull = client.library_full(ctx).await??;
549
550        let ctx = tarpc::context::current();
551        let response = client
552            .library_artist_get_songs(ctx, library_full.artists.first().unwrap().id.clone().into())
553            .await?
554            .unwrap();
555
556        assert_eq!(response, library_full.songs);
557
558        Ok(())
559    }
560
561    #[rstest]
562    #[tokio::test]
563    async fn test_library_artist_get_albums(#[future] client: MusicPlayerClient) -> Result<()> {
564        let client = client.await;
565
566        let ctx = tarpc::context::current();
567        let library_full: LibraryFull = client.library_full(ctx).await??;
568
569        let ctx = tarpc::context::current();
570        let response = client
571            .library_artist_get_albums(ctx, library_full.artists.first().unwrap().id.clone().into())
572            .await?
573            .unwrap();
574
575        assert_eq!(response, library_full.albums);
576
577        Ok(())
578    }
579
580    #[rstest]
581    #[tokio::test]
582    async fn test_playback_volume_toggle_mute(#[future] client: MusicPlayerClient) -> Result<()> {
583        let client = client.await;
584
585        let ctx = tarpc::context::current();
586
587        client.playback_volume_toggle_mute(ctx).await?;
588        Ok(())
589    }
590
591    #[rstest]
592    #[tokio::test]
593    async fn test_playback_stop(#[future] client: MusicPlayerClient) -> Result<()> {
594        let client = client.await;
595
596        let ctx = tarpc::context::current();
597
598        client.playback_stop(ctx).await?;
599        Ok(())
600    }
601
602    #[rstest]
603    #[tokio::test]
604    async fn test_queue_add_list(#[future] client: MusicPlayerClient) -> Result<()> {
605        let client = client.await;
606
607        let ctx = tarpc::context::current();
608        let library_full: LibraryFull = client.library_full(ctx).await??;
609
610        let ctx = tarpc::context::current();
611        let response = client
612            .queue_add_list(
613                ctx,
614                vec![library_full.songs.first().unwrap().id.clone().into()],
615            )
616            .await?;
617
618        assert_eq!(response, Ok(()));
619
620        Ok(())
621    }
622
623    #[rstest]
624    #[case::get(String::from("Playlist 0"))]
625    #[case::create(String::from("Playlist 1"))]
626    #[tokio::test]
627    async fn test_playlist_get_or_create(
628        #[future] client: MusicPlayerClient,
629        #[case] name: String,
630    ) -> Result<()> {
631        let client = client.await;
632
633        let ctx = tarpc::context::current();
634
635        // get or create the playlist
636        let playlist_id = client
637            .playlist_get_or_create(ctx, name.clone())
638            .await?
639            .unwrap();
640
641        // now get that playlist
642        let ctx = tarpc::context::current();
643        let playlist = client.playlist_get(ctx, playlist_id).await?.unwrap();
644
645        assert_eq!(playlist.name, name);
646
647        Ok(())
648    }
649
650    #[rstest]
651    #[tokio::test]
652    async fn test_playlist_clone(#[future] client: MusicPlayerClient) -> Result<()> {
653        let client = client.await;
654
655        let ctx = tarpc::context::current();
656        let library_full: LibraryFull = client.library_full(ctx).await??;
657
658        // clone the only playlist in the db
659        let ctx = tarpc::context::current();
660        let playlist_id = client
661            .playlist_clone(
662                ctx,
663                library_full.playlists.first().unwrap().id.clone().into(),
664            )
665            .await?
666            .unwrap();
667
668        // now get that playlist
669        let ctx = tarpc::context::current();
670        let playlist = client.playlist_get(ctx, playlist_id).await?.unwrap();
671
672        assert_eq!(playlist.name, "Playlist 0 (copy)");
673
674        Ok(())
675    }
676
677    #[rstest]
678    #[tokio::test]
679    async fn test_playlist_get_songs(#[future] client: MusicPlayerClient) -> Result<()> {
680        let client = client.await;
681
682        let ctx = tarpc::context::current();
683        let library_full: LibraryFull = client.library_full(ctx).await??;
684
685        // clone the only playlist in the db
686        let response = client
687            .playlist_get_songs(
688                ctx,
689                library_full.playlists.first().unwrap().id.clone().into(),
690            )
691            .await?
692            .unwrap();
693
694        assert_eq!(response, library_full.songs);
695
696        Ok(())
697    }
698
699    #[rstest]
700    #[tokio::test]
701    async fn test_playlist_rename(#[future] client: MusicPlayerClient) -> Result<()> {
702        let client = client.await;
703
704        let ctx = tarpc::context::current();
705        let library_full: LibraryFull = client.library_full(ctx).await??;
706
707        let target = library_full.playlists.first().unwrap();
708
709        let ctx = tarpc::context::current();
710        let response = client
711            .playlist_rename(ctx, target.id.clone().into(), "New Name".into())
712            .await?;
713
714        let expected = Playlist {
715            name: "New Name".into(),
716            ..target.clone()
717        };
718
719        assert_eq!(response, Ok(expected.clone()));
720
721        let ctx = tarpc::context::current();
722        let response = client
723            .playlist_get(ctx, target.id.clone().into())
724            .await?
725            .unwrap();
726
727        assert_eq!(response, expected);
728        Ok(())
729    }
730
731    #[rstest]
732    #[tokio::test]
733    async fn test_collection_get_songs(#[future] client: MusicPlayerClient) -> Result<()> {
734        let client = client.await;
735
736        let ctx = tarpc::context::current();
737        let library_full: LibraryFull = client.library_full(ctx).await??;
738
739        // clone the only playlist in the db
740        let response = client
741            .collection_get_songs(
742                ctx,
743                library_full.collections.first().unwrap().id.clone().into(),
744            )
745            .await?
746            .unwrap();
747
748        assert_eq!(response, library_full.songs);
749
750        Ok(())
751    }
752
753    #[rstest]
754    #[tokio::test]
755    async fn test_dynamic_playlist_create(#[future] client: MusicPlayerClient) -> Result<()> {
756        let client = client.await;
757
758        let ctx = tarpc::context::current();
759
760        let query: Query = "artist CONTAINS \"Artist 0\"".parse()?;
761
762        let response = client
763            .dynamic_playlist_create(ctx, "Dynamic Playlist 0".into(), query)
764            .await?;
765
766        assert!(response.is_ok());
767
768        Ok(())
769    }
770
771    #[rstest]
772    #[tokio::test]
773    async fn test_dynamic_playlist_list(#[future] client: MusicPlayerClient) -> Result<()> {
774        let client = client.await;
775
776        let ctx = tarpc::context::current();
777
778        let query: Query = "artist CONTAINS \"Artist 0\"".parse()?;
779
780        let dynamic_playlist_id = client
781            .dynamic_playlist_create(ctx, "Dynamic Playlist 0".into(), query)
782            .await?
783            .unwrap();
784
785        let ctx = tarpc::context::current();
786        let response = client.dynamic_playlist_list(ctx).await?;
787
788        assert_eq!(response.len(), 1);
789        assert_eq!(response.first().unwrap().id, dynamic_playlist_id.into());
790
791        Ok(())
792    }
793
794    #[rstest]
795    #[tokio::test]
796    async fn test_dynamic_playlist_update(#[future] client: MusicPlayerClient) -> Result<()> {
797        let client = client.await;
798
799        let ctx = tarpc::context::current();
800
801        let query: Query = "artist CONTAINS \"Artist 0\"".parse()?;
802
803        let dynamic_playlist_id = client
804            .dynamic_playlist_create(ctx, "Dynamic Playlist 0".into(), query.clone())
805            .await?
806            .unwrap();
807
808        let ctx = tarpc::context::current();
809        let response = client
810            .dynamic_playlist_update(
811                ctx,
812                dynamic_playlist_id.clone(),
813                DynamicPlaylistChangeSet::new().name("Dynamic Playlist 1"),
814            )
815            .await?;
816
817        let expected = DynamicPlaylist {
818            id: dynamic_playlist_id.clone().into(),
819            name: "Dynamic Playlist 1".into(),
820            query: query.clone(),
821        };
822
823        assert_eq!(response, Ok(expected.clone()));
824
825        let ctx = tarpc::context::current();
826        let response = client
827            .dynamic_playlist_get(ctx, dynamic_playlist_id)
828            .await?
829            .unwrap();
830
831        assert_eq!(response, expected);
832
833        Ok(())
834    }
835
836    #[rstest]
837    #[tokio::test]
838    async fn test_dynamic_playlist_remove(#[future] client: MusicPlayerClient) -> Result<()> {
839        let client = client.await;
840
841        let ctx = tarpc::context::current();
842
843        let query: Query = "artist CONTAINS \"Artist 0\"".parse()?;
844
845        let dynamic_playlist_id = client
846            .dynamic_playlist_create(ctx, "Dynamic Playlist 0".into(), query)
847            .await?
848            .unwrap();
849
850        let ctx = tarpc::context::current();
851        let response = client
852            .dynamic_playlist_remove(ctx, dynamic_playlist_id)
853            .await?;
854
855        assert_eq!(response, Ok(()));
856
857        let ctx = tarpc::context::current();
858        let response = client.dynamic_playlist_list(ctx).await?;
859
860        assert_eq!(response.len(), 0);
861
862        Ok(())
863    }
864
865    #[rstest]
866    #[tokio::test]
867    async fn test_dynamic_playlist_get(#[future] client: MusicPlayerClient) -> Result<()> {
868        let client = client.await;
869
870        let ctx = tarpc::context::current();
871
872        let query: Query = "artist CONTAINS \"Artist 0\"".parse()?;
873
874        let dynamic_playlist_id = client
875            .dynamic_playlist_create(ctx, "Dynamic Playlist 0".into(), query.clone())
876            .await?
877            .unwrap();
878
879        let ctx = tarpc::context::current();
880        let response = client
881            .dynamic_playlist_get(ctx, dynamic_playlist_id)
882            .await?
883            .unwrap();
884
885        assert_eq!(response.name, "Dynamic Playlist 0");
886        assert_eq!(response.query, query);
887
888        Ok(())
889    }
890
891    #[rstest]
892    #[tokio::test]
893    async fn test_dynamic_playlist_get_songs(#[future] client: MusicPlayerClient) -> Result<()> {
894        let client = client.await;
895
896        let ctx = tarpc::context::current();
897
898        let query: Query = "artist CONTAINS \"Artist 0\"".parse()?;
899
900        let dynamic_playlist_id = client
901            .dynamic_playlist_create(ctx, "Dynamic Playlist 0".into(), query)
902            .await?
903            .unwrap();
904
905        let ctx = tarpc::context::current();
906        let response = client
907            .dynamic_playlist_get_songs(ctx, dynamic_playlist_id)
908            .await?
909            .unwrap();
910
911        assert_eq!(response.len(), 1);
912
913        Ok(())
914    }
915
916    // Dynamic Playlist Import Tests
917    #[rstest]
918    #[tokio::test]
919    async fn test_dynamic_playlist_import(#[future] client: MusicPlayerClient) -> Result<()> {
920        let client = client.await;
921
922        let tmpfile = tempfile::NamedTempFile::with_suffix("dps.csv")?;
923
924        // write a csv file to the temp file
925        let mut file = tmpfile.reopen()?;
926        writeln!(file, "dynamic playlist name,query")?;
927        writeln!(file, "Dynamic Playlist 0,artist CONTAINS \"Artist 0\"")?;
928
929        let tmpfile_path = tmpfile.path().to_path_buf();
930
931        let query: Query = "artist CONTAINS \"Artist 0\"".parse()?;
932
933        let ctx = tarpc::context::current();
934        let response = client.dynamic_playlist_import(ctx, tmpfile_path).await??;
935
936        let expected = DynamicPlaylist {
937            id: response[0].id.clone(),
938            name: "Dynamic Playlist 0".into(),
939            query: query.clone(),
940        };
941
942        assert_eq!(response, vec![expected]);
943
944        Ok(())
945    }
946    #[rstest]
947    #[tokio::test]
948    async fn test_dynamic_playlist_import_file_nonexistent(
949        #[future] client: MusicPlayerClient,
950    ) -> Result<()> {
951        let client = client.await;
952
953        let tmpfile = tempfile::NamedTempFile::with_suffix("dps.csv")?;
954
955        // write a csv file to the temp file
956        let mut file = tmpfile.reopen()?;
957        writeln!(file, "artist,album,album_artist,title")?;
958
959        let tmpfile_path = "/this/path/does/not/exist.csv";
960
961        let ctx = tarpc::context::current();
962        let response = client
963            .dynamic_playlist_import(ctx, tmpfile_path.into())
964            .await?;
965        assert!(response.is_err(), "response: {response:?}");
966        assert_eq!(
967            response.unwrap_err().to_string(),
968            format!("Backup Error: The file \"{tmpfile_path}\" does not exist")
969        );
970        Ok(())
971    }
972    #[rstest]
973    #[tokio::test]
974    async fn test_dynamic_playlist_import_file_wrong_extension(
975        #[future] client: MusicPlayerClient,
976    ) -> Result<()> {
977        let client = client.await;
978
979        let tmpfile = tempfile::NamedTempFile::with_suffix("dps.txt")?;
980
981        // write a csv file to the temp file
982        let mut file = tmpfile.reopen()?;
983        writeln!(file, "artist,album,album_artist,title")?;
984
985        let tmpfile_path = tmpfile.path().to_path_buf();
986
987        let ctx = tarpc::context::current();
988        let response = client
989            .dynamic_playlist_import(ctx, tmpfile_path.clone())
990            .await?;
991        assert!(response.is_err(), "response: {response:?}");
992        assert_eq!(
993            response.unwrap_err().to_string(),
994            format!(
995                "Backup Error: The file \"{}\" has the wrong extension, expected: csv",
996                tmpfile_path.display()
997            )
998        );
999        Ok(())
1000    }
1001    #[rstest]
1002    #[tokio::test]
1003    async fn test_dynamic_playlist_import_file_is_directory(
1004        #[future] client: MusicPlayerClient,
1005    ) -> Result<()> {
1006        let client = client.await;
1007
1008        let tmpfile = tempfile::tempdir()?;
1009
1010        let tmpfile_path = tmpfile.path().to_path_buf();
1011
1012        let ctx = tarpc::context::current();
1013        let response = client
1014            .dynamic_playlist_import(ctx, tmpfile_path.clone())
1015            .await?;
1016        assert!(response.is_err());
1017        assert_eq!(
1018            response.unwrap_err().to_string(),
1019            format!(
1020                "Backup Error: {} is a directory, not a file",
1021                tmpfile_path.display()
1022            )
1023        );
1024        Ok(())
1025    }
1026    #[rstest]
1027    #[tokio::test]
1028    async fn test_dynamic_playlist_import_file_invalid_format(
1029        #[future] client: MusicPlayerClient,
1030    ) -> Result<()> {
1031        let client = client.await;
1032
1033        let tmpfile = tempfile::NamedTempFile::with_suffix("dps.csv")?;
1034
1035        // write a csv file to the temp file
1036        let mut file = tmpfile.reopen()?;
1037        writeln!(file, "artist,album,album_artist,title")?;
1038
1039        let tmpfile_path = tmpfile.path().to_path_buf();
1040
1041        let ctx = tarpc::context::current();
1042        let response = client.dynamic_playlist_import(ctx, tmpfile_path).await?;
1043        assert!(response.is_err());
1044        assert_eq!(
1045            response.unwrap_err().to_string(),
1046            "Backup Error: No valid playlists were found in the csv file."
1047        );
1048        Ok(())
1049    }
1050    #[rstest]
1051    #[tokio::test]
1052    async fn test_dynamic_playlist_import_file_invalid_query(
1053        #[future] client: MusicPlayerClient,
1054    ) -> Result<()> {
1055        let client = client.await;
1056
1057        let tmpfile = tempfile::NamedTempFile::with_suffix("dps.csv")?;
1058
1059        // write a csv file to the temp file
1060        let mut file = tmpfile.reopen()?;
1061        writeln!(file, "dynamic playlist name,query")?;
1062        writeln!(file, "Dynamic Playlist 0,artist CONTAINS \"Artist 0\"")?;
1063        writeln!(file, "Dynamic Playlist 1,artist CONTAINS \"")?;
1064
1065        let tmpfile_path = tmpfile.path().to_path_buf();
1066
1067        let ctx = tarpc::context::current();
1068        let response = client.dynamic_playlist_import(ctx, tmpfile_path).await?;
1069        assert!(
1070            matches!(
1071                response,
1072                Err(SerializableLibraryError::BackupError(
1073                    BackupError::InvalidDynamicPlaylistQuery(_, 2)
1074                ))
1075            ),
1076            "response: {response:?}"
1077        );
1078        Ok(())
1079    }
1080
1081    // Dynamic Playlist Export Tests
1082    #[rstest]
1083    #[tokio::test]
1084    async fn test_dynamic_playlist_export(#[future] client: MusicPlayerClient) -> Result<()> {
1085        let client = client.await;
1086
1087        let tmpdir = tempfile::tempdir()?;
1088        let path = tmpdir.path().join("test.csv");
1089
1090        let query: Query = "artist CONTAINS \"Artist 0\"".parse()?;
1091        let ctx = tarpc::context::current();
1092        let _ = client
1093            .dynamic_playlist_create(ctx, "Dynamic Playlist 0".into(), query.clone())
1094            .await?
1095            .unwrap();
1096
1097        let expected = r#"dynamic playlist name,query
1098Dynamic Playlist 0,"artist CONTAINS ""Artist 0"""
1099"#;
1100
1101        let response = client.dynamic_playlist_export(ctx, path.clone()).await?;
1102        assert_eq!(response, Ok(()));
1103
1104        let mut file = std::fs::File::open(path.clone())?;
1105        let mut contents = String::new();
1106        file.read_to_string(&mut contents)?;
1107        assert_str_eq!(contents, expected);
1108
1109        Ok(())
1110    }
1111    #[rstest]
1112    #[tokio::test]
1113    async fn test_dynamic_playlist_export_file_exists(
1114        #[future] client: MusicPlayerClient,
1115    ) -> Result<()> {
1116        let client = client.await;
1117
1118        let tmpfile = tempfile::NamedTempFile::with_suffix("dps.csv")?;
1119
1120        let ctx = tarpc::context::current();
1121        let response = client
1122            .dynamic_playlist_export(ctx, tmpfile.path().to_path_buf())
1123            .await?;
1124        assert!(matches!(response, Ok(())), "response: {response:?}");
1125        Ok(())
1126    }
1127    #[rstest]
1128    #[tokio::test]
1129    async fn test_dynamic_playlist_export_file_is_directory(
1130        #[future] client: MusicPlayerClient,
1131    ) -> Result<()> {
1132        let client = client.await;
1133
1134        let tmpfile = tempfile::tempdir()?;
1135
1136        let ctx = tarpc::context::current();
1137        let response = client
1138            .dynamic_playlist_export(ctx, tmpfile.path().to_path_buf())
1139            .await?;
1140        assert!(
1141            matches!(
1142                response,
1143                Err(SerializableLibraryError::BackupError(
1144                    BackupError::PathIsDirectory(_)
1145                ))
1146            ),
1147            "response: {response:?}"
1148        );
1149        Ok(())
1150    }
1151    #[rstest]
1152    #[tokio::test]
1153    async fn test_dynamic_playlist_export_file_invalid_extension(
1154        #[future] client: MusicPlayerClient,
1155    ) -> Result<()> {
1156        let client = client.await;
1157
1158        let tmpfile = tempfile::NamedTempFile::with_suffix("dps.txt")?;
1159
1160        let ctx = tarpc::context::current();
1161        let response = client
1162            .dynamic_playlist_export(ctx, tmpfile.path().to_path_buf())
1163            .await?;
1164        assert!(response.is_err(), "response: {response:?}");
1165        let err = response.unwrap_err();
1166        assert!(
1167            matches!(
1168                &err,
1169                SerializableLibraryError::BackupError(
1170                    BackupError::WrongExtension(_, expected_extension)
1171                ) if expected_extension == "csv"
1172            ),
1173            "response: {err:?}"
1174        );
1175
1176        Ok(())
1177    }
1178
1179    // Playlist import test
1180    #[rstest]
1181    #[tokio::test]
1182    async fn test_playlist_import(#[future] client: MusicPlayerClient) -> Result<()> {
1183        let client = client.await;
1184
1185        let tmpfile = tempfile::NamedTempFile::with_suffix("pl.m3u")?;
1186
1187        // write a csv file to the temp file
1188        let mut file = tmpfile.reopen()?;
1189        write!(
1190            file,
1191            r"#EXTM3U
1192#EXTINF:123,Sample Artist - Sample title
1193/path/to/song.mp3
1194"
1195        )?;
1196
1197        let tmpfile_path = tmpfile.path().to_path_buf();
1198
1199        let ctx = tarpc::context::current();
1200        let response = client.playlist_import(ctx, tmpfile_path, None).await?;
1201        assert!(response.is_ok());
1202        let response = response.unwrap();
1203
1204        let ctx = tarpc::context::current();
1205        let playlist = client.playlist_get(ctx, response.clone()).await?.unwrap();
1206
1207        assert_eq!(playlist.name, "Imported Playlist");
1208        assert_eq!(playlist.song_count, 1);
1209
1210        let ctx = tarpc::context::current();
1211        let songs = client
1212            .playlist_get_songs(ctx, response.clone())
1213            .await?
1214            .unwrap();
1215        assert_eq!(songs.len(), 1);
1216        assert_eq!(songs[0].path.to_string_lossy(), "/path/to/song.mp3");
1217
1218        Ok(())
1219    }
1220
1221    #[rstest]
1222    #[tokio::test]
1223    async fn test_playlist_export(#[future] client: MusicPlayerClient) -> Result<()> {
1224        let client = client.await;
1225
1226        let tmpdir = tempfile::tempdir()?;
1227        let path = tmpdir.path().join("test.m3u");
1228
1229        let ctx = tarpc::context::current();
1230        let library_full: LibraryFull = client.library_full(ctx).await??;
1231
1232        let playlist = library_full.playlists[0].clone();
1233
1234        let response = client
1235            .playlist_export(ctx, playlist.id.clone().into(), path.clone())
1236            .await?;
1237        assert_eq!(response, Ok(()));
1238
1239        let mut file = std::fs::File::open(path.clone())?;
1240        let mut contents = String::new();
1241        file.read_to_string(&mut contents)?;
1242        assert_str_eq!(
1243            contents,
1244            r"#EXTM3U
1245
1246#PLAYLIST:Playlist 0
1247
1248#EXTINF:120,Song 0 - Artist 0
1249#EXTGENRE:Genre 0
1250#EXTALB:Artist 0
1251/path/to/song.mp3
1252
1253"
1254        );
1255
1256        Ok(())
1257    }
1258}