mecomp_daemon/
lib.rs

1#![deny(clippy::missing_inline_in_public_items)]
2
3//----------------------------------------------------------------------------------------- std lib
4use std::{
5    net::{IpAddr, Ipv4Addr},
6    sync::Arc,
7};
8//--------------------------------------------------------------------------------- other libraries
9use futures::{future, prelude::*};
10use log::info;
11use surrealdb::{engine::local::Db, Surreal};
12use tarpc::{
13    self,
14    server::{incoming::Incoming as _, BaseChannel, Channel as _},
15    tokio_serde::formats::Json,
16};
17//-------------------------------------------------------------------------------- MECOMP libraries
18use mecomp_core::{
19    audio::AudioKernelSender,
20    config::Settings,
21    is_server_running,
22    logger::{init_logger, init_tracing},
23    rpc::{MusicPlayer as _, MusicPlayerClient},
24    udp::{Message, Sender},
25};
26use mecomp_storage::db::{init_database, set_database_path};
27use tokio::sync::RwLock;
28
29async fn spawn(fut: impl Future<Output = ()> + Send + 'static) {
30    tokio::spawn(fut);
31}
32
33pub mod controller;
34#[cfg(feature = "dynamic_updates")]
35pub mod dynamic_updates;
36pub mod services;
37#[cfg(test)]
38pub use mecomp_core::test_utils;
39
40use crate::controller::MusicPlayerServer;
41
42// TODO: at some point, we should probably add a panic handler to the daemon to ensure graceful shutdown.
43
44/// Run the daemon
45///
46/// also initializes the logger, database, and other necessary components.
47///
48/// # Arguments
49///
50/// * `settings` - The settings to use.
51/// * `db_dir` - The directory where the database is stored.
52///              If the directory does not exist, it will be created.
53/// * `log_file_path` - The path to the file where logs will be written.
54///
55/// # Errors
56///
57/// If the daemon cannot be started, an error is returned.
58///
59/// # Panics
60///
61/// Panics if the peer address of the underlying TCP transport cannot be determined.
62#[inline]
63pub async fn start_daemon(
64    settings: Settings,
65    db_dir: std::path::PathBuf,
66    log_file_path: Option<std::path::PathBuf>,
67) -> anyhow::Result<()> {
68    // Throw the given settings into an Arc so we can share settings across threads.
69    let settings = Arc::new(settings);
70
71    // check if a server is already running
72    if is_server_running(settings.daemon.rpc_port) {
73        anyhow::bail!(
74            "A server is already running on port {}",
75            settings.daemon.rpc_port
76        );
77    }
78
79    // Initialize the logger, database, and tracing.
80    init_logger(settings.daemon.log_level, log_file_path);
81    set_database_path(db_dir)?;
82    let db = Arc::new(init_database().await?);
83    tracing::subscriber::set_global_default(init_tracing())?;
84
85    // Start the music library watcher.
86    #[cfg(feature = "dynamic_updates")]
87    let guard = dynamic_updates::init_music_library_watcher(
88        db.clone(),
89        &settings.daemon.library_paths,
90        settings.daemon.artist_separator.clone(),
91        settings.daemon.genre_separator.clone(),
92    )?;
93
94    // Start the audio kernel.
95    let (event_tx, event_rx) = std::sync::mpsc::channel();
96    let audio_kernel = AudioKernelSender::start(event_tx);
97    let event_publisher = Arc::new(RwLock::new(Sender::new().await?));
98    let server = MusicPlayerServer::new(
99        db.clone(),
100        settings.clone(),
101        audio_kernel.clone(),
102        event_publisher.clone(),
103    );
104
105    // Start StateChange publisher thread.
106    // this thread listens for events from the audio kernel and forwards them to the event publisher (managed by the daemon)
107    // the event publisher then pushes them to all the clients
108    let eft_guard = tokio::spawn(async move {
109        while let Ok(event) = event_rx.recv() {
110            event_publisher
111                .read()
112                .await
113                .send(Message::StateChange(event))
114                .await
115                .unwrap();
116        }
117    });
118
119    // TODO: set up some kind of signal handler to ensure that the daemon is shut down gracefully (including sending a `DaemonShutdown` event to all clients)
120
121    // Start the RPC server.
122    let server_addr = (IpAddr::V4(Ipv4Addr::LOCALHOST), settings.daemon.rpc_port);
123
124    let mut listener = tarpc::serde_transport::tcp::listen(&server_addr, Json::default).await?;
125    info!("Listening on {}", listener.local_addr());
126    listener.config_mut().max_frame_length(usize::MAX);
127    listener
128        // Ignore accept errors.
129        .filter_map(|r| future::ready(r.ok()))
130        .map(BaseChannel::with_defaults)
131        // Limit channels to 10 per IP.
132        .max_channels_per_key(10, |t| t.transport().peer_addr().unwrap().ip())
133        // Set up the server's handling of incoming connections.
134        // serve is generated by the service attribute.
135        // It takes as input any type implementing the generated MusicPlayer trait.
136        .map(|channel| channel.execute(server.clone().serve()).for_each(spawn))
137        // Max 10 channels.
138        // this means that we will only process 10 requests at a time
139        // NOTE: if we have issues with concurrency (e.g. deadlocks or data-races),
140        //       and have too much of a skill issue to fix it, we can set this number to 1.
141        .buffer_unordered(10)
142        .for_each(|()| async {})
143        .await;
144
145    #[cfg(feature = "dynamic_updates")]
146    guard.stop();
147
148    eft_guard.abort();
149
150    Ok(())
151}
152
153/// Initialize a test client, sends and receives messages over a channel / pipe.
154/// This is useful for testing the server without needing to start it.
155///
156/// # Errors
157///
158/// Errors if the event publisher cannot be created.
159#[inline]
160pub async fn init_test_client_server(
161    db: Arc<Surreal<Db>>,
162    settings: Arc<Settings>,
163    audio_kernel: Arc<AudioKernelSender>,
164) -> anyhow::Result<MusicPlayerClient> {
165    let (client_transport, server_transport) = tarpc::transport::channel::unbounded();
166
167    let event_publisher = Arc::new(RwLock::new(Sender::new().await?));
168    let server = MusicPlayerServer::new(db, settings, audio_kernel, event_publisher);
169    tokio::spawn(
170        tarpc::server::BaseChannel::with_defaults(server_transport)
171            .execute(server.serve())
172            // Handle all requests concurrently.
173            .for_each(|response| async move {
174                tokio::spawn(response);
175            }),
176    );
177
178    // MusicPlayerClient is generated by the #[tarpc::service] attribute. It has a constructor `new`
179    // that takes a config and any Transport as input.
180    Ok(MusicPlayerClient::new(tarpc::client::Config::default(), client_transport).spawn())
181}
182
183#[cfg(test)]
184mod test_client_tests {
185    //! Tests for:
186    //! - the `init_test_client_server` function
187    //! - daemon endpoints that aren't covered in other tests
188
189    use super::*;
190    use anyhow::Result;
191    use mecomp_core::state::library::LibraryFull;
192    use mecomp_storage::{
193        db::schemas::{
194            collection::Collection,
195            dynamic::{query::Query, DynamicPlaylist, DynamicPlaylistChangeSet},
196            playlist::Playlist,
197            song::SongChangeSet,
198        },
199        test_utils::{create_song_with_overrides, init_test_database, SongCase},
200    };
201
202    use pretty_assertions::assert_eq;
203    use rstest::{fixture, rstest};
204
205    #[fixture]
206    async fn db() -> Arc<Surreal<Db>> {
207        let db = Arc::new(init_test_database().await.unwrap());
208
209        // create a test song, add it to a playlist and collection
210
211        let song_case = SongCase::new(0, vec![0], vec![0], 0, 0);
212
213        // Call the create_song function
214        let song = create_song_with_overrides(
215            &db,
216            song_case,
217            SongChangeSet {
218                // need to specify overrides so that items are created in the db
219                artist: Some(one_or_many::OneOrMany::One("Artist 0".into())),
220                album_artist: Some(one_or_many::OneOrMany::One("Artist 0".into())),
221                album: Some("Album 0".into()),
222                ..Default::default()
223            },
224        )
225        .await
226        .unwrap();
227
228        // create a playlist with the song
229        let playlist = Playlist {
230            id: Playlist::generate_id(),
231            name: "Playlist 0".into(),
232            runtime: song.runtime,
233            song_count: 1,
234        };
235
236        let result = Playlist::create(&db, playlist).await.unwrap().unwrap();
237
238        Playlist::add_songs(&db, result.id, vec![song.id.clone()])
239            .await
240            .unwrap();
241
242        // create a collection with the song
243        let collection = Collection {
244            id: Collection::generate_id(),
245            name: "Collection 0".into(),
246            runtime: song.runtime,
247            song_count: 1,
248        };
249
250        let result = Collection::create(&db, collection).await.unwrap().unwrap();
251
252        Collection::add_songs(&db, result.id, vec![song.id])
253            .await
254            .unwrap();
255
256        return db;
257    }
258
259    #[fixture]
260    async fn client(#[future] db: Arc<Surreal<Db>>) -> MusicPlayerClient {
261        let settings = Arc::new(Settings::default());
262        let (tx, _) = std::sync::mpsc::channel();
263        let audio_kernel = AudioKernelSender::start(tx);
264
265        init_test_client_server(db.await, settings, audio_kernel)
266            .await
267            .unwrap()
268    }
269
270    #[tokio::test]
271    async fn test_init_test_client_server() {
272        let db = Arc::new(init_test_database().await.unwrap());
273        let settings = Arc::new(Settings::default());
274        let (tx, _) = std::sync::mpsc::channel();
275        let audio_kernel = AudioKernelSender::start(tx);
276
277        let client = init_test_client_server(db, settings, audio_kernel)
278            .await
279            .unwrap();
280
281        let ctx = tarpc::context::current();
282        let response = client.ping(ctx).await.unwrap();
283
284        assert_eq!(response, "pong");
285
286        // ensure that the client is shutdown properly
287        drop(client);
288    }
289
290    #[rstest]
291    #[tokio::test]
292    async fn test_library_song_get_artist(#[future] client: MusicPlayerClient) -> Result<()> {
293        let client = client.await;
294
295        let ctx = tarpc::context::current();
296        let library_full: LibraryFull = client.library_full(ctx).await??;
297
298        let ctx = tarpc::context::current();
299        let response = client
300            .library_song_get_artist(ctx, library_full.songs.first().unwrap().id.clone().into())
301            .await?;
302
303        assert_eq!(response, library_full.artists.into_vec().into());
304
305        Ok(())
306    }
307
308    #[rstest]
309    #[tokio::test]
310    async fn test_library_song_get_album(#[future] client: MusicPlayerClient) -> Result<()> {
311        let client = client.await;
312
313        let ctx = tarpc::context::current();
314        let library_full: LibraryFull = client.library_full(ctx).await??;
315
316        let ctx = tarpc::context::current();
317        let response = client
318            .library_song_get_album(ctx, library_full.songs.first().unwrap().id.clone().into())
319            .await?
320            .unwrap();
321
322        assert_eq!(response, library_full.albums.first().unwrap().clone());
323
324        Ok(())
325    }
326
327    #[rstest]
328    #[tokio::test]
329    async fn test_library_song_get_playlists(#[future] client: MusicPlayerClient) -> Result<()> {
330        let client = client.await;
331
332        let ctx = tarpc::context::current();
333        let library_full: LibraryFull = client.library_full(ctx).await??;
334
335        let ctx = tarpc::context::current();
336        let response = client
337            .library_song_get_playlists(ctx, library_full.songs.first().unwrap().id.clone().into())
338            .await?;
339
340        assert_eq!(response, library_full.playlists.into_vec().into());
341
342        Ok(())
343    }
344
345    #[rstest]
346    #[tokio::test]
347    async fn test_library_album_get_artist(#[future] client: MusicPlayerClient) -> Result<()> {
348        let client = client.await;
349
350        let ctx = tarpc::context::current();
351        let library_full: LibraryFull = client.library_full(ctx).await??;
352
353        let ctx = tarpc::context::current();
354        let response = client
355            .library_album_get_artist(ctx, library_full.albums.first().unwrap().id.clone().into())
356            .await?;
357
358        assert_eq!(response, library_full.artists.into_vec().into());
359
360        Ok(())
361    }
362
363    #[rstest]
364    #[tokio::test]
365    async fn test_library_album_get_songs(#[future] client: MusicPlayerClient) -> Result<()> {
366        let client = client.await;
367
368        let ctx = tarpc::context::current();
369        let library_full: LibraryFull = client.library_full(ctx).await??;
370
371        let ctx = tarpc::context::current();
372        let response = client
373            .library_album_get_songs(ctx, library_full.albums.first().unwrap().id.clone().into())
374            .await?
375            .unwrap();
376
377        assert_eq!(response, library_full.songs);
378
379        Ok(())
380    }
381
382    #[rstest]
383    #[tokio::test]
384    async fn test_library_artist_get_songs(#[future] client: MusicPlayerClient) -> Result<()> {
385        let client = client.await;
386
387        let ctx = tarpc::context::current();
388        let library_full: LibraryFull = client.library_full(ctx).await??;
389
390        let ctx = tarpc::context::current();
391        let response = client
392            .library_artist_get_songs(ctx, library_full.artists.first().unwrap().id.clone().into())
393            .await?
394            .unwrap();
395
396        assert_eq!(response, library_full.songs);
397
398        Ok(())
399    }
400
401    #[rstest]
402    #[tokio::test]
403    async fn test_library_artist_get_albums(#[future] client: MusicPlayerClient) -> Result<()> {
404        let client = client.await;
405
406        let ctx = tarpc::context::current();
407        let library_full: LibraryFull = client.library_full(ctx).await??;
408
409        let ctx = tarpc::context::current();
410        let response = client
411            .library_artist_get_albums(ctx, library_full.artists.first().unwrap().id.clone().into())
412            .await?
413            .unwrap();
414
415        assert_eq!(response, library_full.albums);
416
417        Ok(())
418    }
419
420    #[rstest]
421    #[tokio::test]
422    async fn test_playback_volume_toggle_mute(#[future] client: MusicPlayerClient) -> Result<()> {
423        let client = client.await;
424
425        let ctx = tarpc::context::current();
426
427        client.playback_volume_toggle_mute(ctx).await?;
428        Ok(())
429    }
430
431    #[rstest]
432    #[tokio::test]
433    async fn test_playback_stop(#[future] client: MusicPlayerClient) -> Result<()> {
434        let client = client.await;
435
436        let ctx = tarpc::context::current();
437
438        client.playback_stop(ctx).await?;
439        Ok(())
440    }
441
442    #[rstest]
443    #[tokio::test]
444    async fn test_queue_add_list(#[future] client: MusicPlayerClient) -> Result<()> {
445        let client = client.await;
446
447        let ctx = tarpc::context::current();
448        let library_full: LibraryFull = client.library_full(ctx).await??;
449
450        let ctx = tarpc::context::current();
451        let response = client
452            .queue_add_list(
453                ctx,
454                vec![library_full.songs.first().unwrap().id.clone().into()],
455            )
456            .await?;
457
458        assert_eq!(response, Ok(()));
459
460        Ok(())
461    }
462
463    #[rstest]
464    #[case::get(String::from("Playlist 0"))]
465    #[case::create(String::from("Playlist 1"))]
466    #[tokio::test]
467    async fn test_playlist_get_or_create(
468        #[future] client: MusicPlayerClient,
469        #[case] name: String,
470    ) -> Result<()> {
471        let client = client.await;
472
473        let ctx = tarpc::context::current();
474
475        // get or create the playlist
476        let playlist_id = client
477            .playlist_get_or_create(ctx, name.clone())
478            .await?
479            .unwrap();
480
481        // now get that playlist
482        let ctx = tarpc::context::current();
483        let playlist = client.playlist_get(ctx, playlist_id).await?.unwrap();
484
485        assert_eq!(playlist.name, name);
486
487        Ok(())
488    }
489
490    #[rstest]
491    #[tokio::test]
492    async fn test_playlist_clone(#[future] client: MusicPlayerClient) -> Result<()> {
493        let client = client.await;
494
495        let ctx = tarpc::context::current();
496        let library_full: LibraryFull = client.library_full(ctx).await??;
497
498        // clone the only playlist in the db
499        let ctx = tarpc::context::current();
500        let playlist_id = client
501            .playlist_clone(
502                ctx,
503                library_full.playlists.first().unwrap().id.clone().into(),
504            )
505            .await?
506            .unwrap();
507
508        // now get that playlist
509        let ctx = tarpc::context::current();
510        let playlist = client.playlist_get(ctx, playlist_id).await?.unwrap();
511
512        assert_eq!(playlist.name, "Playlist 0 (copy)");
513
514        Ok(())
515    }
516
517    #[rstest]
518    #[tokio::test]
519    async fn test_playlist_get_songs(#[future] client: MusicPlayerClient) -> Result<()> {
520        let client = client.await;
521
522        let ctx = tarpc::context::current();
523        let library_full: LibraryFull = client.library_full(ctx).await??;
524
525        // clone the only playlist in the db
526        let response = client
527            .playlist_get_songs(
528                ctx,
529                library_full.playlists.first().unwrap().id.clone().into(),
530            )
531            .await?
532            .unwrap();
533
534        assert_eq!(response, library_full.songs);
535
536        Ok(())
537    }
538
539    #[rstest]
540    #[tokio::test]
541    async fn test_playlist_rename(#[future] client: MusicPlayerClient) -> Result<()> {
542        let client = client.await;
543
544        let ctx = tarpc::context::current();
545        let library_full: LibraryFull = client.library_full(ctx).await??;
546
547        let target = library_full.playlists.first().unwrap();
548
549        let ctx = tarpc::context::current();
550        let response = client
551            .playlist_rename(ctx, target.id.clone().into(), "New Name".into())
552            .await?;
553
554        let expected = Playlist {
555            name: "New Name".into(),
556            ..target.clone()
557        };
558
559        assert_eq!(response, Ok(expected.clone()));
560
561        let ctx = tarpc::context::current();
562        let response = client
563            .playlist_get(ctx, target.id.clone().into())
564            .await?
565            .unwrap();
566
567        assert_eq!(response, expected);
568        Ok(())
569    }
570
571    #[rstest]
572    #[tokio::test]
573    async fn test_collection_get_songs(#[future] client: MusicPlayerClient) -> Result<()> {
574        let client = client.await;
575
576        let ctx = tarpc::context::current();
577        let library_full: LibraryFull = client.library_full(ctx).await??;
578
579        // clone the only playlist in the db
580        let response = client
581            .collection_get_songs(
582                ctx,
583                library_full.collections.first().unwrap().id.clone().into(),
584            )
585            .await?
586            .unwrap();
587
588        assert_eq!(response, library_full.songs);
589
590        Ok(())
591    }
592
593    #[rstest]
594    #[tokio::test]
595    async fn test_dynamic_playlist_create(#[future] client: MusicPlayerClient) -> Result<()> {
596        let client = client.await;
597
598        let ctx = tarpc::context::current();
599
600        let query: Query = "artist CONTAINS \"Artist 0\"".parse()?;
601
602        let response = client
603            .dynamic_playlist_create(ctx, "Dynamic Playlist 0".into(), query)
604            .await?;
605
606        assert!(response.is_ok());
607
608        Ok(())
609    }
610
611    #[rstest]
612    #[tokio::test]
613    async fn test_dynamic_playlist_list(#[future] client: MusicPlayerClient) -> Result<()> {
614        let client = client.await;
615
616        let ctx = tarpc::context::current();
617
618        let query: Query = "artist CONTAINS \"Artist 0\"".parse()?;
619
620        let dynamic_playlist_id = client
621            .dynamic_playlist_create(ctx, "Dynamic Playlist 0".into(), query)
622            .await?
623            .unwrap();
624
625        let ctx = tarpc::context::current();
626        let response = client.dynamic_playlist_list(ctx).await?;
627
628        assert_eq!(response.len(), 1);
629        assert_eq!(response.first().unwrap().id, dynamic_playlist_id.into());
630
631        Ok(())
632    }
633
634    #[rstest]
635    #[tokio::test]
636    async fn test_dynamic_playlist_update(#[future] client: MusicPlayerClient) -> Result<()> {
637        let client = client.await;
638
639        let ctx = tarpc::context::current();
640
641        let query: Query = "artist CONTAINS \"Artist 0\"".parse()?;
642
643        let dynamic_playlist_id = client
644            .dynamic_playlist_create(ctx, "Dynamic Playlist 0".into(), query.clone())
645            .await?
646            .unwrap();
647
648        let ctx = tarpc::context::current();
649        let response = client
650            .dynamic_playlist_update(
651                ctx,
652                dynamic_playlist_id.clone(),
653                DynamicPlaylistChangeSet::new().name("Dynamic Playlist 1"),
654            )
655            .await?;
656
657        let expected = DynamicPlaylist {
658            id: dynamic_playlist_id.clone().into(),
659            name: "Dynamic Playlist 1".into(),
660            query: query.clone(),
661        };
662
663        assert_eq!(response, Ok(expected.clone()));
664
665        let ctx = tarpc::context::current();
666        let response = client
667            .dynamic_playlist_get(ctx, dynamic_playlist_id)
668            .await?
669            .unwrap();
670
671        assert_eq!(response, expected);
672
673        Ok(())
674    }
675
676    #[rstest]
677    #[tokio::test]
678    async fn test_dynamic_playlist_remove(#[future] client: MusicPlayerClient) -> Result<()> {
679        let client = client.await;
680
681        let ctx = tarpc::context::current();
682
683        let query: Query = "artist CONTAINS \"Artist 0\"".parse()?;
684
685        let dynamic_playlist_id = client
686            .dynamic_playlist_create(ctx, "Dynamic Playlist 0".into(), query)
687            .await?
688            .unwrap();
689
690        let ctx = tarpc::context::current();
691        let response = client
692            .dynamic_playlist_remove(ctx, dynamic_playlist_id)
693            .await?;
694
695        assert_eq!(response, Ok(()));
696
697        let ctx = tarpc::context::current();
698        let response = client.dynamic_playlist_list(ctx).await?;
699
700        assert_eq!(response.len(), 0);
701
702        Ok(())
703    }
704
705    #[rstest]
706    #[tokio::test]
707    async fn test_dynamic_playlist_get(#[future] client: MusicPlayerClient) -> Result<()> {
708        let client = client.await;
709
710        let ctx = tarpc::context::current();
711
712        let query: Query = "artist CONTAINS \"Artist 0\"".parse()?;
713
714        let dynamic_playlist_id = client
715            .dynamic_playlist_create(ctx, "Dynamic Playlist 0".into(), query.clone())
716            .await?
717            .unwrap();
718
719        let ctx = tarpc::context::current();
720        let response = client
721            .dynamic_playlist_get(ctx, dynamic_playlist_id)
722            .await?
723            .unwrap();
724
725        assert_eq!(response.name, "Dynamic Playlist 0");
726        assert_eq!(response.query, query);
727
728        Ok(())
729    }
730
731    #[rstest]
732    #[tokio::test]
733    async fn test_dynamic_playlist_get_songs(#[future] client: MusicPlayerClient) -> Result<()> {
734        let client = client.await;
735
736        let ctx = tarpc::context::current();
737
738        let query: Query = "artist CONTAINS \"Artist 0\"".parse()?;
739
740        let dynamic_playlist_id = client
741            .dynamic_playlist_create(ctx, "Dynamic Playlist 0".into(), query)
742            .await?
743            .unwrap();
744
745        let ctx = tarpc::context::current();
746        let response = client
747            .dynamic_playlist_get_songs(ctx, dynamic_playlist_id)
748            .await?
749            .unwrap();
750
751        assert_eq!(response.len(), 1);
752
753        Ok(())
754    }
755}