mecomp_daemon/
lib.rs

1//----------------------------------------------------------------------------------------- std lib
2use std::{
3    net::{IpAddr, Ipv4Addr},
4    sync::Arc,
5};
6//--------------------------------------------------------------------------------- other libraries
7use futures::{future, prelude::*};
8use log::info;
9use surrealdb::{engine::local::Db, Surreal};
10use tarpc::{
11    self,
12    server::{incoming::Incoming as _, BaseChannel, Channel as _},
13    tokio_serde::formats::Json,
14};
15//-------------------------------------------------------------------------------- MECOMP libraries
16use mecomp_core::{
17    audio::AudioKernelSender,
18    is_server_running,
19    logger::{init_logger, init_tracing},
20    rpc::{MusicPlayer as _, MusicPlayerClient},
21    udp::{Message, Sender},
22};
23use mecomp_storage::db::{init_database, set_database_path};
24use tokio::sync::RwLock;
25
26async fn spawn(fut: impl Future<Output = ()> + Send + 'static) {
27    tokio::spawn(fut);
28}
29
30pub mod config;
31pub mod controller;
32#[cfg(feature = "dynamic_updates")]
33pub mod dynamic_updates;
34pub mod services;
35#[cfg(test)]
36pub use mecomp_core::test_utils;
37
38use crate::config::Settings;
39use crate::controller::MusicPlayerServer;
40
41// TODO: at some point, we should probably add a panic handler to the daemon to ensure graceful shutdown.
42
43/// Run the daemon
44///
45/// also initializes the logger, database, and other necessary components.
46///
47/// # Arguments
48///
49/// * `settings` - The settings to use.
50/// * `db_dir` - The directory where the database is stored.
51///              If the directory does not exist, it will be created.
52/// * `log_file_path` - The path to the file where logs will be written.
53///
54/// # Errors
55///
56/// If the daemon cannot be started, an error is returned.
57///
58/// # Panics
59///
60/// Panics if the peer address of the underlying TCP transport cannot be determined.
61pub async fn start_daemon(
62    settings: Settings,
63    db_dir: std::path::PathBuf,
64    log_file_path: Option<std::path::PathBuf>,
65) -> anyhow::Result<()> {
66    // Throw the given settings into an Arc so we can share settings across threads.
67    let settings = Arc::new(settings);
68
69    // check if a server is already running
70    if is_server_running(settings.daemon.rpc_port) {
71        anyhow::bail!(
72            "A server is already running on port {}",
73            settings.daemon.rpc_port
74        );
75    }
76
77    // Initialize the logger, database, and tracing.
78    init_logger(settings.daemon.log_level, log_file_path);
79    set_database_path(db_dir)?;
80    let db = Arc::new(init_database().await?);
81    tracing::subscriber::set_global_default(init_tracing())?;
82
83    // Start the music library watcher.
84    #[cfg(feature = "dynamic_updates")]
85    let guard = dynamic_updates::init_music_library_watcher(
86        db.clone(),
87        &settings.daemon.library_paths,
88        settings.daemon.artist_separator.clone(),
89        settings.daemon.genre_separator.clone(),
90    )?;
91
92    // Start the audio kernel.
93    let (event_tx, event_rx) = std::sync::mpsc::channel();
94    let audio_kernel = AudioKernelSender::start(event_tx);
95    let event_publisher = Arc::new(RwLock::new(Sender::new().await?));
96    let server = MusicPlayerServer::new(
97        db.clone(),
98        settings.clone(),
99        audio_kernel.clone(),
100        event_publisher.clone(),
101    );
102
103    // Start StateChange publisher thread.
104    // this thread listens for events from the audio kernel and forwards them to the event publisher (managed by the daemon)
105    // the event publisher then pushes them to all the clients
106    let eft_guard = tokio::spawn(async move {
107        while let Ok(event) = event_rx.recv() {
108            event_publisher
109                .read()
110                .await
111                .send(Message::StateChange(event))
112                .await
113                .unwrap();
114        }
115    });
116
117    // TODO: set up some kind of signal handler to ensure that the daemon is shut down gracefully (including sending a `DaemonShutdown` event to all clients)
118
119    // Start the RPC server.
120    let server_addr = (IpAddr::V4(Ipv4Addr::LOCALHOST), settings.daemon.rpc_port);
121
122    let mut listener = tarpc::serde_transport::tcp::listen(&server_addr, Json::default).await?;
123    info!("Listening on {}", listener.local_addr());
124    listener.config_mut().max_frame_length(usize::MAX);
125    listener
126        // Ignore accept errors.
127        .filter_map(|r| future::ready(r.ok()))
128        .map(BaseChannel::with_defaults)
129        // Limit channels to 10 per IP.
130        .max_channels_per_key(10, |t| t.transport().peer_addr().unwrap().ip())
131        // Set up the server's handling of incoming connections.
132        // serve is generated by the service attribute.
133        // It takes as input any type implementing the generated MusicPlayer trait.
134        .map(|channel| channel.execute(server.clone().serve()).for_each(spawn))
135        // Max 10 channels.
136        // this means that we will only process 10 requests at a time
137        // NOTE: if we have issues with concurrency (e.g. deadlocks or data-races),
138        //       and have too much of a skill issue to fix it, we can set this number to 1.
139        .buffer_unordered(10)
140        .for_each(|()| async {})
141        .await;
142
143    #[cfg(feature = "dynamic_updates")]
144    guard.stop();
145
146    eft_guard.abort();
147
148    Ok(())
149}
150
151/// Initialize a test client, sends and receives messages over a channel / pipe.
152/// This is useful for testing the server without needing to start it.
153///
154/// # Errors
155///
156/// Errors if the event publisher cannot be created.
157pub async fn init_test_client_server(
158    db: Arc<Surreal<Db>>,
159    settings: Arc<Settings>,
160    audio_kernel: Arc<AudioKernelSender>,
161) -> anyhow::Result<MusicPlayerClient> {
162    let (client_transport, server_transport) = tarpc::transport::channel::unbounded();
163
164    let event_publisher = Arc::new(RwLock::new(Sender::new().await?));
165    let server = MusicPlayerServer::new(db, settings, audio_kernel, event_publisher);
166    tokio::spawn(
167        tarpc::server::BaseChannel::with_defaults(server_transport)
168            .execute(server.serve())
169            // Handle all requests concurrently.
170            .for_each(|response| async move {
171                tokio::spawn(response);
172            }),
173    );
174
175    // MusicPlayerClient is generated by the #[tarpc::service] attribute. It has a constructor `new`
176    // that takes a config and any Transport as input.
177    Ok(MusicPlayerClient::new(tarpc::client::Config::default(), client_transport).spawn())
178}
179
180#[cfg(test)]
181mod test_client_tests {
182    //! Tests for:
183    //! - the `init_test_client_server` function
184    //! - daemon endpoints that aren't covered in other tests
185
186    use super::*;
187    use anyhow::Result;
188    use mecomp_core::state::library::LibraryFull;
189    use mecomp_storage::{
190        db::schemas::{
191            collection::Collection,
192            dynamic::{query::Query, DynamicPlaylist, DynamicPlaylistChangeSet},
193            playlist::Playlist,
194            song::SongChangeSet,
195        },
196        test_utils::{create_song_with_overrides, init_test_database, SongCase},
197    };
198
199    use pretty_assertions::assert_eq;
200    use rstest::{fixture, rstest};
201
202    #[fixture]
203    async fn db() -> Arc<Surreal<Db>> {
204        let db = Arc::new(init_test_database().await.unwrap());
205
206        // create a test song, add it to a playlist and collection
207
208        let song_case = SongCase::new(0, vec![0], vec![0], 0, 0);
209
210        // Call the create_song function
211        let song = create_song_with_overrides(
212            &db,
213            song_case,
214            SongChangeSet {
215                // need to specify overrides so that items are created in the db
216                artist: Some(one_or_many::OneOrMany::One("Artist 0".into())),
217                album_artist: Some(one_or_many::OneOrMany::One("Artist 0".into())),
218                album: Some("Album 0".into()),
219                ..Default::default()
220            },
221        )
222        .await
223        .unwrap();
224
225        // create a playlist with the song
226        let playlist = Playlist {
227            id: Playlist::generate_id(),
228            name: "Playlist 0".into(),
229            runtime: song.runtime,
230            song_count: 1,
231        };
232
233        let result = Playlist::create(&db, playlist).await.unwrap().unwrap();
234
235        Playlist::add_songs(&db, result.id, vec![song.id.clone()])
236            .await
237            .unwrap();
238
239        // create a collection with the song
240        let collection = Collection {
241            id: Collection::generate_id(),
242            name: "Collection 0".into(),
243            runtime: song.runtime,
244            song_count: 1,
245        };
246
247        let result = Collection::create(&db, collection).await.unwrap().unwrap();
248
249        Collection::add_songs(&db, result.id, vec![song.id])
250            .await
251            .unwrap();
252
253        return db;
254    }
255
256    #[fixture]
257    async fn client(#[future] db: Arc<Surreal<Db>>) -> MusicPlayerClient {
258        let settings = Arc::new(Settings::default());
259        let (tx, _) = std::sync::mpsc::channel();
260        let audio_kernel = AudioKernelSender::start(tx);
261
262        init_test_client_server(db.await, settings, audio_kernel)
263            .await
264            .unwrap()
265    }
266
267    #[tokio::test]
268    async fn test_init_test_client_server() {
269        let db = Arc::new(init_test_database().await.unwrap());
270        let settings = Arc::new(Settings::default());
271        let (tx, _) = std::sync::mpsc::channel();
272        let audio_kernel = AudioKernelSender::start(tx);
273
274        let client = init_test_client_server(db, settings, audio_kernel)
275            .await
276            .unwrap();
277
278        let ctx = tarpc::context::current();
279        let response = client.ping(ctx).await.unwrap();
280
281        assert_eq!(response, "pong");
282
283        // ensure that the client is shutdown properly
284        drop(client);
285    }
286
287    #[rstest]
288    #[tokio::test]
289    async fn test_library_song_get_artist(#[future] client: MusicPlayerClient) -> Result<()> {
290        let client = client.await;
291
292        let ctx = tarpc::context::current();
293        let library_full: LibraryFull = client.library_full(ctx).await??;
294
295        let ctx = tarpc::context::current();
296        let response = client
297            .library_song_get_artist(ctx, library_full.songs.first().unwrap().id.clone().into())
298            .await?;
299
300        assert_eq!(response, library_full.artists.into_vec().into());
301
302        Ok(())
303    }
304
305    #[rstest]
306    #[tokio::test]
307    async fn test_library_song_get_album(#[future] client: MusicPlayerClient) -> Result<()> {
308        let client = client.await;
309
310        let ctx = tarpc::context::current();
311        let library_full: LibraryFull = client.library_full(ctx).await??;
312
313        let ctx = tarpc::context::current();
314        let response = client
315            .library_song_get_album(ctx, library_full.songs.first().unwrap().id.clone().into())
316            .await?
317            .unwrap();
318
319        assert_eq!(response, library_full.albums.first().unwrap().clone());
320
321        Ok(())
322    }
323
324    #[rstest]
325    #[tokio::test]
326    async fn test_library_song_get_playlists(#[future] client: MusicPlayerClient) -> Result<()> {
327        let client = client.await;
328
329        let ctx = tarpc::context::current();
330        let library_full: LibraryFull = client.library_full(ctx).await??;
331
332        let ctx = tarpc::context::current();
333        let response = client
334            .library_song_get_playlists(ctx, library_full.songs.first().unwrap().id.clone().into())
335            .await?;
336
337        assert_eq!(response, library_full.playlists.into_vec().into());
338
339        Ok(())
340    }
341
342    #[rstest]
343    #[tokio::test]
344    async fn test_library_album_get_artist(#[future] client: MusicPlayerClient) -> Result<()> {
345        let client = client.await;
346
347        let ctx = tarpc::context::current();
348        let library_full: LibraryFull = client.library_full(ctx).await??;
349
350        let ctx = tarpc::context::current();
351        let response = client
352            .library_album_get_artist(ctx, library_full.albums.first().unwrap().id.clone().into())
353            .await?;
354
355        assert_eq!(response, library_full.artists.into_vec().into());
356
357        Ok(())
358    }
359
360    #[rstest]
361    #[tokio::test]
362    async fn test_library_album_get_songs(#[future] client: MusicPlayerClient) -> Result<()> {
363        let client = client.await;
364
365        let ctx = tarpc::context::current();
366        let library_full: LibraryFull = client.library_full(ctx).await??;
367
368        let ctx = tarpc::context::current();
369        let response = client
370            .library_album_get_songs(ctx, library_full.albums.first().unwrap().id.clone().into())
371            .await?
372            .unwrap();
373
374        assert_eq!(response, library_full.songs);
375
376        Ok(())
377    }
378
379    #[rstest]
380    #[tokio::test]
381    async fn test_library_artist_get_songs(#[future] client: MusicPlayerClient) -> Result<()> {
382        let client = client.await;
383
384        let ctx = tarpc::context::current();
385        let library_full: LibraryFull = client.library_full(ctx).await??;
386
387        let ctx = tarpc::context::current();
388        let response = client
389            .library_artist_get_songs(ctx, library_full.artists.first().unwrap().id.clone().into())
390            .await?
391            .unwrap();
392
393        assert_eq!(response, library_full.songs);
394
395        Ok(())
396    }
397
398    #[rstest]
399    #[tokio::test]
400    async fn test_library_artist_get_albums(#[future] client: MusicPlayerClient) -> Result<()> {
401        let client = client.await;
402
403        let ctx = tarpc::context::current();
404        let library_full: LibraryFull = client.library_full(ctx).await??;
405
406        let ctx = tarpc::context::current();
407        let response = client
408            .library_artist_get_albums(ctx, library_full.artists.first().unwrap().id.clone().into())
409            .await?
410            .unwrap();
411
412        assert_eq!(response, library_full.albums);
413
414        Ok(())
415    }
416
417    #[rstest]
418    #[tokio::test]
419    async fn test_playback_volume_toggle_mute(#[future] client: MusicPlayerClient) -> Result<()> {
420        let client = client.await;
421
422        let ctx = tarpc::context::current();
423
424        client.playback_volume_toggle_mute(ctx).await?;
425        Ok(())
426    }
427
428    #[rstest]
429    #[tokio::test]
430    async fn test_playback_stop(#[future] client: MusicPlayerClient) -> Result<()> {
431        let client = client.await;
432
433        let ctx = tarpc::context::current();
434
435        client.playback_stop(ctx).await?;
436        Ok(())
437    }
438
439    #[rstest]
440    #[tokio::test]
441    async fn test_queue_add_list(#[future] client: MusicPlayerClient) -> Result<()> {
442        let client = client.await;
443
444        let ctx = tarpc::context::current();
445        let library_full: LibraryFull = client.library_full(ctx).await??;
446
447        let ctx = tarpc::context::current();
448        let response = client
449            .queue_add_list(
450                ctx,
451                vec![library_full.songs.first().unwrap().id.clone().into()],
452            )
453            .await?;
454
455        assert_eq!(response, Ok(()));
456
457        Ok(())
458    }
459
460    #[rstest]
461    #[case::get(String::from("Playlist 0"))]
462    #[case::create(String::from("Playlist 1"))]
463    #[tokio::test]
464    async fn test_playlist_get_or_create(
465        #[future] client: MusicPlayerClient,
466        #[case] name: String,
467    ) -> Result<()> {
468        let client = client.await;
469
470        let ctx = tarpc::context::current();
471
472        // get or create the playlist
473        let playlist_id = client
474            .playlist_get_or_create(ctx, name.clone())
475            .await?
476            .unwrap();
477
478        // now get that playlist
479        let ctx = tarpc::context::current();
480        let playlist = client.playlist_get(ctx, playlist_id).await?.unwrap();
481
482        assert_eq!(playlist.name, name.into());
483
484        Ok(())
485    }
486
487    #[rstest]
488    #[tokio::test]
489    async fn test_playlist_clone(#[future] client: MusicPlayerClient) -> Result<()> {
490        let client = client.await;
491
492        let ctx = tarpc::context::current();
493        let library_full: LibraryFull = client.library_full(ctx).await??;
494
495        // clone the only playlist in the db
496        let ctx = tarpc::context::current();
497        let playlist_id = client
498            .playlist_clone(
499                ctx,
500                library_full.playlists.first().unwrap().id.clone().into(),
501            )
502            .await?
503            .unwrap();
504
505        // now get that playlist
506        let ctx = tarpc::context::current();
507        let playlist = client.playlist_get(ctx, playlist_id).await?.unwrap();
508
509        assert_eq!(playlist.name, "Playlist 0 (copy)".into());
510
511        Ok(())
512    }
513
514    #[rstest]
515    #[tokio::test]
516    async fn test_playlist_get_songs(#[future] client: MusicPlayerClient) -> Result<()> {
517        let client = client.await;
518
519        let ctx = tarpc::context::current();
520        let library_full: LibraryFull = client.library_full(ctx).await??;
521
522        // clone the only playlist in the db
523        let response = client
524            .playlist_get_songs(
525                ctx,
526                library_full.playlists.first().unwrap().id.clone().into(),
527            )
528            .await?
529            .unwrap();
530
531        assert_eq!(response, library_full.songs);
532
533        Ok(())
534    }
535
536    #[rstest]
537    #[tokio::test]
538    async fn test_playlist_rename(#[future] client: MusicPlayerClient) -> Result<()> {
539        let client = client.await;
540
541        let ctx = tarpc::context::current();
542        let library_full: LibraryFull = client.library_full(ctx).await??;
543
544        let target = library_full.playlists.first().unwrap();
545
546        let ctx = tarpc::context::current();
547        let response = client
548            .playlist_rename(ctx, target.id.clone().into(), "New Name".into())
549            .await?;
550
551        let expected = Playlist {
552            name: "New Name".into(),
553            ..target.clone()
554        };
555
556        assert_eq!(response, Ok(expected.clone()));
557
558        let ctx = tarpc::context::current();
559        let response = client
560            .playlist_get(ctx, target.id.clone().into())
561            .await?
562            .unwrap();
563
564        assert_eq!(response, expected);
565        Ok(())
566    }
567
568    #[rstest]
569    #[tokio::test]
570    async fn test_collection_get_songs(#[future] client: MusicPlayerClient) -> Result<()> {
571        let client = client.await;
572
573        let ctx = tarpc::context::current();
574        let library_full: LibraryFull = client.library_full(ctx).await??;
575
576        // clone the only playlist in the db
577        let response = client
578            .collection_get_songs(
579                ctx,
580                library_full.collections.first().unwrap().id.clone().into(),
581            )
582            .await?
583            .unwrap();
584
585        assert_eq!(response, library_full.songs);
586
587        Ok(())
588    }
589
590    #[rstest]
591    #[tokio::test]
592    async fn test_dynamic_playlist_create(#[future] client: MusicPlayerClient) -> Result<()> {
593        let client = client.await;
594
595        let ctx = tarpc::context::current();
596
597        let query: Query = "artist CONTAINS \"Artist 0\"".parse()?;
598
599        let response = client
600            .dynamic_playlist_create(ctx, "Dynamic Playlist 0".into(), query)
601            .await?;
602
603        assert!(response.is_ok());
604
605        Ok(())
606    }
607
608    #[rstest]
609    #[tokio::test]
610    async fn test_dynamic_playlist_list(#[future] client: MusicPlayerClient) -> Result<()> {
611        let client = client.await;
612
613        let ctx = tarpc::context::current();
614
615        let query: Query = "artist CONTAINS \"Artist 0\"".parse()?;
616
617        let dynamic_playlist_id = client
618            .dynamic_playlist_create(ctx, "Dynamic Playlist 0".into(), query)
619            .await?
620            .unwrap();
621
622        let ctx = tarpc::context::current();
623        let response = client.dynamic_playlist_list(ctx).await?;
624
625        assert_eq!(response.len(), 1);
626        assert_eq!(response.first().unwrap().id, dynamic_playlist_id.into());
627
628        Ok(())
629    }
630
631    #[rstest]
632    #[tokio::test]
633    async fn test_dynamic_playlist_update(#[future] client: MusicPlayerClient) -> Result<()> {
634        let client = client.await;
635
636        let ctx = tarpc::context::current();
637
638        let query: Query = "artist CONTAINS \"Artist 0\"".parse()?;
639
640        let dynamic_playlist_id = client
641            .dynamic_playlist_create(ctx, "Dynamic Playlist 0".into(), query.clone())
642            .await?
643            .unwrap();
644
645        let ctx = tarpc::context::current();
646        let response = client
647            .dynamic_playlist_update(
648                ctx,
649                dynamic_playlist_id.clone(),
650                DynamicPlaylistChangeSet::new().name("Dynamic Playlist 1"),
651            )
652            .await?;
653
654        let expected = DynamicPlaylist {
655            id: dynamic_playlist_id.clone().into(),
656            name: "Dynamic Playlist 1".into(),
657            query: query.clone(),
658        };
659
660        assert_eq!(response, Ok(expected.clone()));
661
662        let ctx = tarpc::context::current();
663        let response = client
664            .dynamic_playlist_get(ctx, dynamic_playlist_id)
665            .await?
666            .unwrap();
667
668        assert_eq!(response, expected);
669
670        Ok(())
671    }
672
673    #[rstest]
674    #[tokio::test]
675    async fn test_dynamic_playlist_remove(#[future] client: MusicPlayerClient) -> Result<()> {
676        let client = client.await;
677
678        let ctx = tarpc::context::current();
679
680        let query: Query = "artist CONTAINS \"Artist 0\"".parse()?;
681
682        let dynamic_playlist_id = client
683            .dynamic_playlist_create(ctx, "Dynamic Playlist 0".into(), query)
684            .await?
685            .unwrap();
686
687        let ctx = tarpc::context::current();
688        let response = client
689            .dynamic_playlist_remove(ctx, dynamic_playlist_id)
690            .await?;
691
692        assert_eq!(response, Ok(()));
693
694        let ctx = tarpc::context::current();
695        let response = client.dynamic_playlist_list(ctx).await?;
696
697        assert_eq!(response.len(), 0);
698
699        Ok(())
700    }
701
702    #[rstest]
703    #[tokio::test]
704    async fn test_dynamic_playlist_get(#[future] client: MusicPlayerClient) -> Result<()> {
705        let client = client.await;
706
707        let ctx = tarpc::context::current();
708
709        let query: Query = "artist CONTAINS \"Artist 0\"".parse()?;
710
711        let dynamic_playlist_id = client
712            .dynamic_playlist_create(ctx, "Dynamic Playlist 0".into(), query.clone())
713            .await?
714            .unwrap();
715
716        let ctx = tarpc::context::current();
717        let response = client
718            .dynamic_playlist_get(ctx, dynamic_playlist_id)
719            .await?
720            .unwrap();
721
722        assert_eq!(response.name, "Dynamic Playlist 0".into());
723        assert_eq!(response.query, query);
724
725        Ok(())
726    }
727
728    #[rstest]
729    #[tokio::test]
730    async fn test_dynamic_playlist_get_songs(#[future] client: MusicPlayerClient) -> Result<()> {
731        let client = client.await;
732
733        let ctx = tarpc::context::current();
734
735        let query: Query = "artist CONTAINS \"Artist 0\"".parse()?;
736
737        let dynamic_playlist_id = client
738            .dynamic_playlist_create(ctx, "Dynamic Playlist 0".into(), query)
739            .await?
740            .unwrap();
741
742        let ctx = tarpc::context::current();
743        let response = client
744            .dynamic_playlist_get_songs(ctx, dynamic_playlist_id)
745            .await?
746            .unwrap();
747
748        assert_eq!(response.len(), 1);
749
750        Ok(())
751    }
752}