1#![deny(clippy::missing_inline_in_public_items)]
2
3use std::{
5 net::{IpAddr, Ipv4Addr},
6 sync::Arc,
7};
8use futures::{
10 FutureExt, future,
11 prelude::*,
12 stream::{AbortHandle, Abortable},
13};
14use log::{error, info};
15use surrealdb::{Surreal, engine::local::Db};
16use tarpc::{
17 self,
18 server::{BaseChannel, Channel as _, incoming::Incoming as _},
19 tokio_serde::formats::Json,
20};
21use mecomp_core::{
23 audio::{AudioKernelSender, commands::AudioCommand},
24 config::Settings,
25 is_server_running,
26 logger::{init_logger, init_tracing},
27 rpc::{MusicPlayer as _, MusicPlayerClient},
28 udp::{Message, Sender},
29};
30use mecomp_storage::db::{init_database, set_database_path};
31use tokio::sync::RwLock;
32
33async fn spawn(fut: impl Future<Output = ()> + Send + 'static) {
34 tokio::spawn(fut);
35}
36
37pub mod controller;
38#[cfg(feature = "dynamic_updates")]
39pub mod dynamic_updates;
40pub mod services;
41mod termination;
42#[cfg(test)]
43pub use mecomp_core::test_utils;
44
45use crate::controller::MusicPlayerServer;
46
47#[inline]
68#[allow(clippy::redundant_pub_crate)]
69pub async fn start_daemon(
70 settings: Settings,
71 db_dir: std::path::PathBuf,
72 log_file_path: Option<std::path::PathBuf>,
73) -> anyhow::Result<()> {
74 let settings = Arc::new(settings);
76
77 if is_server_running(settings.daemon.rpc_port) {
79 anyhow::bail!(
80 "A server is already running on port {}",
81 settings.daemon.rpc_port
82 );
83 }
84
85 init_logger(settings.daemon.log_level, log_file_path);
87 set_database_path(db_dir)?;
88 let db = Arc::new(init_database().await?);
89 tracing::subscriber::set_global_default(init_tracing())?;
90
91 #[cfg(feature = "dynamic_updates")]
93 let guard = dynamic_updates::init_music_library_watcher(
94 db.clone(),
95 &settings.daemon.library_paths,
96 settings.daemon.artist_separator.clone(),
97 settings.daemon.protected_artist_names.clone(),
98 settings.daemon.genre_separator.clone(),
99 )?;
100
101 let (event_tx, event_rx) = std::sync::mpsc::channel();
103 let event_publisher = Arc::new(RwLock::new(Sender::new().await?));
104
105 let (terminator, mut interrupt_rx) = termination::create_termination();
107
108 let audio_kernel = AudioKernelSender::start(event_tx);
110
111 let server = MusicPlayerServer::new(
113 db.clone(),
114 settings.clone(),
115 audio_kernel.clone(),
116 event_publisher.clone(),
117 terminator.clone(),
118 );
119
120 let eft_guard = {
124 let event_publisher = event_publisher.clone();
125 tokio::spawn(async move {
126 while let Ok(event) = event_rx.recv() {
127 event_publisher
128 .read()
129 .await
130 .send(Message::StateChange(event))
131 .await
132 .unwrap();
133 }
134 })
135 };
136
137 let server_addr = (IpAddr::V4(Ipv4Addr::LOCALHOST), settings.daemon.rpc_port);
139
140 let mut listener = tarpc::serde_transport::tcp::listen(&server_addr, Json::default).await?;
141 info!("Listening on {}", listener.local_addr());
142 listener.config_mut().max_frame_length(usize::MAX);
143 let server_handle = listener
144 .filter_map(|r| future::ready(r.ok()))
146 .map(BaseChannel::with_defaults)
147 .max_channels_per_key(10, |t| t.transport().peer_addr().unwrap().ip())
149 .map(|channel| channel.execute(server.clone().serve()).for_each(spawn))
153 .buffer_unordered(10)
158 .for_each(async |()| {})
159 .fuse();
161 let (abort_handle, abort_registration) = AbortHandle::new_pair();
163 let abortable_server_handle = Abortable::new(server_handle, abort_registration);
164
165 tokio::select! {
167 _ = abortable_server_handle => {
168 error!("Server stopped unexpectedly");
169 },
170 reason = interrupt_rx.recv() => {
173 match reason {
174 Ok(termination::Interrupted::UserInt) => info!("Stopping server per user request"),
175 Ok(termination::Interrupted::OsSigInt) => info!("Stopping server because of an os sig int"),
176 Ok(termination::Interrupted::OsSigTerm) => info!("Stopping server because of an os sig term"),
177 Ok(termination::Interrupted::OsSigQuit) => info!("Stopping server because of an os sig quit"),
178 Err(e) => error!("Stopping server because of an unexpected error: {e}"),
179 }
180 }
181 }
182
183 abort_handle.abort();
185
186 audio_kernel.send(AudioCommand::Exit);
188
189 #[cfg(feature = "dynamic_updates")]
190 guard.stop();
191
192 let _ = event_publisher
194 .read()
195 .await
196 .send(Message::Event(mecomp_core::udp::Event::DaemonShutdown))
197 .await;
198 eft_guard.abort();
199
200 Ok(())
201}
202
203#[inline]
210pub async fn init_test_client_server(
211 db: Arc<Surreal<Db>>,
212 settings: Arc<Settings>,
213 audio_kernel: Arc<AudioKernelSender>,
214) -> anyhow::Result<MusicPlayerClient> {
215 let (client_transport, server_transport) = tarpc::transport::channel::unbounded();
216
217 let event_publisher = Arc::new(RwLock::new(Sender::new().await?));
218 let (terminator, mut interrupt_rx) = termination::create_termination();
220 #[allow(clippy::redundant_pub_crate)]
221 tokio::spawn(async move {
222 let server = MusicPlayerServer::new(
223 db,
224 settings,
225 audio_kernel.clone(),
226 event_publisher.clone(),
227 terminator,
228 );
229 tokio::select! {
230 () = tarpc::server::BaseChannel::with_defaults(server_transport)
231 .execute(server.serve())
232 .for_each(async |response| {
234 tokio::spawn(response);
235 }) => {},
236 _ = interrupt_rx.recv() => {
238 info!("Stopping server...");
240 audio_kernel.send(AudioCommand::Exit);
241 let _ = event_publisher.read().await.send(Message::Event(mecomp_core::udp::Event::DaemonShutdown)).await;
242 info!("Server stopped");
243 }
244 }
245 });
246
247 Ok(MusicPlayerClient::new(tarpc::client::Config::default(), client_transport).spawn())
250}
251
252#[cfg(test)]
253mod test_client_tests {
254 use super::*;
259 use anyhow::Result;
260 use mecomp_core::state::library::LibraryFull;
261 use mecomp_storage::{
262 db::schemas::{
263 collection::Collection,
264 dynamic::{DynamicPlaylist, DynamicPlaylistChangeSet, query::Query},
265 playlist::Playlist,
266 song::SongChangeSet,
267 },
268 test_utils::{SongCase, create_song_with_overrides, init_test_database},
269 };
270
271 use pretty_assertions::assert_eq;
272 use rstest::{fixture, rstest};
273
274 #[fixture]
275 async fn db() -> Arc<Surreal<Db>> {
276 let db = Arc::new(init_test_database().await.unwrap());
277
278 let song_case = SongCase::new(0, vec![0], vec![0], 0, 0);
281
282 let song = create_song_with_overrides(
284 &db,
285 song_case,
286 SongChangeSet {
287 artist: Some(one_or_many::OneOrMany::One("Artist 0".into())),
289 album_artist: Some(one_or_many::OneOrMany::One("Artist 0".into())),
290 album: Some("Album 0".into()),
291 ..Default::default()
292 },
293 )
294 .await
295 .unwrap();
296
297 let playlist = Playlist {
299 id: Playlist::generate_id(),
300 name: "Playlist 0".into(),
301 runtime: song.runtime,
302 song_count: 1,
303 };
304
305 let result = Playlist::create(&db, playlist).await.unwrap().unwrap();
306
307 Playlist::add_songs(&db, result.id, vec![song.id.clone()])
308 .await
309 .unwrap();
310
311 let collection = Collection {
313 id: Collection::generate_id(),
314 name: "Collection 0".into(),
315 runtime: song.runtime,
316 song_count: 1,
317 };
318
319 let result = Collection::create(&db, collection).await.unwrap().unwrap();
320
321 Collection::add_songs(&db, result.id, vec![song.id])
322 .await
323 .unwrap();
324
325 return db;
326 }
327
328 #[fixture]
329 async fn client(#[future] db: Arc<Surreal<Db>>) -> MusicPlayerClient {
330 let settings = Arc::new(Settings::default());
331 let (tx, _) = std::sync::mpsc::channel();
332 let audio_kernel = AudioKernelSender::start(tx);
333
334 init_test_client_server(db.await, settings, audio_kernel)
335 .await
336 .unwrap()
337 }
338
339 #[tokio::test]
340 async fn test_init_test_client_server() {
341 let db = Arc::new(init_test_database().await.unwrap());
342 let settings = Arc::new(Settings::default());
343 let (tx, _) = std::sync::mpsc::channel();
344 let audio_kernel = AudioKernelSender::start(tx);
345
346 let client = init_test_client_server(db, settings, audio_kernel)
347 .await
348 .unwrap();
349
350 let ctx = tarpc::context::current();
351 let response = client.ping(ctx).await.unwrap();
352
353 assert_eq!(response, "pong");
354
355 drop(client);
357 }
358
359 #[rstest]
360 #[tokio::test]
361 async fn test_library_song_get_artist(#[future] client: MusicPlayerClient) -> Result<()> {
362 let client = client.await;
363
364 let ctx = tarpc::context::current();
365 let library_full: LibraryFull = client.library_full(ctx).await??;
366
367 let ctx = tarpc::context::current();
368 let response = client
369 .library_song_get_artist(ctx, library_full.songs.first().unwrap().id.clone().into())
370 .await?;
371
372 assert_eq!(response, library_full.artists.into_vec().into());
373
374 Ok(())
375 }
376
377 #[rstest]
378 #[tokio::test]
379 async fn test_library_song_get_album(#[future] client: MusicPlayerClient) -> Result<()> {
380 let client = client.await;
381
382 let ctx = tarpc::context::current();
383 let library_full: LibraryFull = client.library_full(ctx).await??;
384
385 let ctx = tarpc::context::current();
386 let response = client
387 .library_song_get_album(ctx, library_full.songs.first().unwrap().id.clone().into())
388 .await?
389 .unwrap();
390
391 assert_eq!(response, library_full.albums.first().unwrap().clone());
392
393 Ok(())
394 }
395
396 #[rstest]
397 #[tokio::test]
398 async fn test_library_song_get_playlists(#[future] client: MusicPlayerClient) -> Result<()> {
399 let client = client.await;
400
401 let ctx = tarpc::context::current();
402 let library_full: LibraryFull = client.library_full(ctx).await??;
403
404 let ctx = tarpc::context::current();
405 let response = client
406 .library_song_get_playlists(ctx, library_full.songs.first().unwrap().id.clone().into())
407 .await?;
408
409 assert_eq!(response, library_full.playlists.into_vec().into());
410
411 Ok(())
412 }
413
414 #[rstest]
415 #[tokio::test]
416 async fn test_library_album_get_artist(#[future] client: MusicPlayerClient) -> Result<()> {
417 let client = client.await;
418
419 let ctx = tarpc::context::current();
420 let library_full: LibraryFull = client.library_full(ctx).await??;
421
422 let ctx = tarpc::context::current();
423 let response = client
424 .library_album_get_artist(ctx, library_full.albums.first().unwrap().id.clone().into())
425 .await?;
426
427 assert_eq!(response, library_full.artists.into_vec().into());
428
429 Ok(())
430 }
431
432 #[rstest]
433 #[tokio::test]
434 async fn test_library_album_get_songs(#[future] client: MusicPlayerClient) -> Result<()> {
435 let client = client.await;
436
437 let ctx = tarpc::context::current();
438 let library_full: LibraryFull = client.library_full(ctx).await??;
439
440 let ctx = tarpc::context::current();
441 let response = client
442 .library_album_get_songs(ctx, library_full.albums.first().unwrap().id.clone().into())
443 .await?
444 .unwrap();
445
446 assert_eq!(response, library_full.songs);
447
448 Ok(())
449 }
450
451 #[rstest]
452 #[tokio::test]
453 async fn test_library_artist_get_songs(#[future] client: MusicPlayerClient) -> Result<()> {
454 let client = client.await;
455
456 let ctx = tarpc::context::current();
457 let library_full: LibraryFull = client.library_full(ctx).await??;
458
459 let ctx = tarpc::context::current();
460 let response = client
461 .library_artist_get_songs(ctx, library_full.artists.first().unwrap().id.clone().into())
462 .await?
463 .unwrap();
464
465 assert_eq!(response, library_full.songs);
466
467 Ok(())
468 }
469
470 #[rstest]
471 #[tokio::test]
472 async fn test_library_artist_get_albums(#[future] client: MusicPlayerClient) -> Result<()> {
473 let client = client.await;
474
475 let ctx = tarpc::context::current();
476 let library_full: LibraryFull = client.library_full(ctx).await??;
477
478 let ctx = tarpc::context::current();
479 let response = client
480 .library_artist_get_albums(ctx, library_full.artists.first().unwrap().id.clone().into())
481 .await?
482 .unwrap();
483
484 assert_eq!(response, library_full.albums);
485
486 Ok(())
487 }
488
489 #[rstest]
490 #[tokio::test]
491 async fn test_playback_volume_toggle_mute(#[future] client: MusicPlayerClient) -> Result<()> {
492 let client = client.await;
493
494 let ctx = tarpc::context::current();
495
496 client.playback_volume_toggle_mute(ctx).await?;
497 Ok(())
498 }
499
500 #[rstest]
501 #[tokio::test]
502 async fn test_playback_stop(#[future] client: MusicPlayerClient) -> Result<()> {
503 let client = client.await;
504
505 let ctx = tarpc::context::current();
506
507 client.playback_stop(ctx).await?;
508 Ok(())
509 }
510
511 #[rstest]
512 #[tokio::test]
513 async fn test_queue_add_list(#[future] client: MusicPlayerClient) -> Result<()> {
514 let client = client.await;
515
516 let ctx = tarpc::context::current();
517 let library_full: LibraryFull = client.library_full(ctx).await??;
518
519 let ctx = tarpc::context::current();
520 let response = client
521 .queue_add_list(
522 ctx,
523 vec![library_full.songs.first().unwrap().id.clone().into()],
524 )
525 .await?;
526
527 assert_eq!(response, Ok(()));
528
529 Ok(())
530 }
531
532 #[rstest]
533 #[case::get(String::from("Playlist 0"))]
534 #[case::create(String::from("Playlist 1"))]
535 #[tokio::test]
536 async fn test_playlist_get_or_create(
537 #[future] client: MusicPlayerClient,
538 #[case] name: String,
539 ) -> Result<()> {
540 let client = client.await;
541
542 let ctx = tarpc::context::current();
543
544 let playlist_id = client
546 .playlist_get_or_create(ctx, name.clone())
547 .await?
548 .unwrap();
549
550 let ctx = tarpc::context::current();
552 let playlist = client.playlist_get(ctx, playlist_id).await?.unwrap();
553
554 assert_eq!(playlist.name, name);
555
556 Ok(())
557 }
558
559 #[rstest]
560 #[tokio::test]
561 async fn test_playlist_clone(#[future] client: MusicPlayerClient) -> Result<()> {
562 let client = client.await;
563
564 let ctx = tarpc::context::current();
565 let library_full: LibraryFull = client.library_full(ctx).await??;
566
567 let ctx = tarpc::context::current();
569 let playlist_id = client
570 .playlist_clone(
571 ctx,
572 library_full.playlists.first().unwrap().id.clone().into(),
573 )
574 .await?
575 .unwrap();
576
577 let ctx = tarpc::context::current();
579 let playlist = client.playlist_get(ctx, playlist_id).await?.unwrap();
580
581 assert_eq!(playlist.name, "Playlist 0 (copy)");
582
583 Ok(())
584 }
585
586 #[rstest]
587 #[tokio::test]
588 async fn test_playlist_get_songs(#[future] client: MusicPlayerClient) -> Result<()> {
589 let client = client.await;
590
591 let ctx = tarpc::context::current();
592 let library_full: LibraryFull = client.library_full(ctx).await??;
593
594 let response = client
596 .playlist_get_songs(
597 ctx,
598 library_full.playlists.first().unwrap().id.clone().into(),
599 )
600 .await?
601 .unwrap();
602
603 assert_eq!(response, library_full.songs);
604
605 Ok(())
606 }
607
608 #[rstest]
609 #[tokio::test]
610 async fn test_playlist_rename(#[future] client: MusicPlayerClient) -> Result<()> {
611 let client = client.await;
612
613 let ctx = tarpc::context::current();
614 let library_full: LibraryFull = client.library_full(ctx).await??;
615
616 let target = library_full.playlists.first().unwrap();
617
618 let ctx = tarpc::context::current();
619 let response = client
620 .playlist_rename(ctx, target.id.clone().into(), "New Name".into())
621 .await?;
622
623 let expected = Playlist {
624 name: "New Name".into(),
625 ..target.clone()
626 };
627
628 assert_eq!(response, Ok(expected.clone()));
629
630 let ctx = tarpc::context::current();
631 let response = client
632 .playlist_get(ctx, target.id.clone().into())
633 .await?
634 .unwrap();
635
636 assert_eq!(response, expected);
637 Ok(())
638 }
639
640 #[rstest]
641 #[tokio::test]
642 async fn test_collection_get_songs(#[future] client: MusicPlayerClient) -> Result<()> {
643 let client = client.await;
644
645 let ctx = tarpc::context::current();
646 let library_full: LibraryFull = client.library_full(ctx).await??;
647
648 let response = client
650 .collection_get_songs(
651 ctx,
652 library_full.collections.first().unwrap().id.clone().into(),
653 )
654 .await?
655 .unwrap();
656
657 assert_eq!(response, library_full.songs);
658
659 Ok(())
660 }
661
662 #[rstest]
663 #[tokio::test]
664 async fn test_dynamic_playlist_create(#[future] client: MusicPlayerClient) -> Result<()> {
665 let client = client.await;
666
667 let ctx = tarpc::context::current();
668
669 let query: Query = "artist CONTAINS \"Artist 0\"".parse()?;
670
671 let response = client
672 .dynamic_playlist_create(ctx, "Dynamic Playlist 0".into(), query)
673 .await?;
674
675 assert!(response.is_ok());
676
677 Ok(())
678 }
679
680 #[rstest]
681 #[tokio::test]
682 async fn test_dynamic_playlist_list(#[future] client: MusicPlayerClient) -> Result<()> {
683 let client = client.await;
684
685 let ctx = tarpc::context::current();
686
687 let query: Query = "artist CONTAINS \"Artist 0\"".parse()?;
688
689 let dynamic_playlist_id = client
690 .dynamic_playlist_create(ctx, "Dynamic Playlist 0".into(), query)
691 .await?
692 .unwrap();
693
694 let ctx = tarpc::context::current();
695 let response = client.dynamic_playlist_list(ctx).await?;
696
697 assert_eq!(response.len(), 1);
698 assert_eq!(response.first().unwrap().id, dynamic_playlist_id.into());
699
700 Ok(())
701 }
702
703 #[rstest]
704 #[tokio::test]
705 async fn test_dynamic_playlist_update(#[future] client: MusicPlayerClient) -> Result<()> {
706 let client = client.await;
707
708 let ctx = tarpc::context::current();
709
710 let query: Query = "artist CONTAINS \"Artist 0\"".parse()?;
711
712 let dynamic_playlist_id = client
713 .dynamic_playlist_create(ctx, "Dynamic Playlist 0".into(), query.clone())
714 .await?
715 .unwrap();
716
717 let ctx = tarpc::context::current();
718 let response = client
719 .dynamic_playlist_update(
720 ctx,
721 dynamic_playlist_id.clone(),
722 DynamicPlaylistChangeSet::new().name("Dynamic Playlist 1"),
723 )
724 .await?;
725
726 let expected = DynamicPlaylist {
727 id: dynamic_playlist_id.clone().into(),
728 name: "Dynamic Playlist 1".into(),
729 query: query.clone(),
730 };
731
732 assert_eq!(response, Ok(expected.clone()));
733
734 let ctx = tarpc::context::current();
735 let response = client
736 .dynamic_playlist_get(ctx, dynamic_playlist_id)
737 .await?
738 .unwrap();
739
740 assert_eq!(response, expected);
741
742 Ok(())
743 }
744
745 #[rstest]
746 #[tokio::test]
747 async fn test_dynamic_playlist_remove(#[future] client: MusicPlayerClient) -> Result<()> {
748 let client = client.await;
749
750 let ctx = tarpc::context::current();
751
752 let query: Query = "artist CONTAINS \"Artist 0\"".parse()?;
753
754 let dynamic_playlist_id = client
755 .dynamic_playlist_create(ctx, "Dynamic Playlist 0".into(), query)
756 .await?
757 .unwrap();
758
759 let ctx = tarpc::context::current();
760 let response = client
761 .dynamic_playlist_remove(ctx, dynamic_playlist_id)
762 .await?;
763
764 assert_eq!(response, Ok(()));
765
766 let ctx = tarpc::context::current();
767 let response = client.dynamic_playlist_list(ctx).await?;
768
769 assert_eq!(response.len(), 0);
770
771 Ok(())
772 }
773
774 #[rstest]
775 #[tokio::test]
776 async fn test_dynamic_playlist_get(#[future] client: MusicPlayerClient) -> Result<()> {
777 let client = client.await;
778
779 let ctx = tarpc::context::current();
780
781 let query: Query = "artist CONTAINS \"Artist 0\"".parse()?;
782
783 let dynamic_playlist_id = client
784 .dynamic_playlist_create(ctx, "Dynamic Playlist 0".into(), query.clone())
785 .await?
786 .unwrap();
787
788 let ctx = tarpc::context::current();
789 let response = client
790 .dynamic_playlist_get(ctx, dynamic_playlist_id)
791 .await?
792 .unwrap();
793
794 assert_eq!(response.name, "Dynamic Playlist 0");
795 assert_eq!(response.query, query);
796
797 Ok(())
798 }
799
800 #[rstest]
801 #[tokio::test]
802 async fn test_dynamic_playlist_get_songs(#[future] client: MusicPlayerClient) -> Result<()> {
803 let client = client.await;
804
805 let ctx = tarpc::context::current();
806
807 let query: Query = "artist CONTAINS \"Artist 0\"".parse()?;
808
809 let dynamic_playlist_id = client
810 .dynamic_playlist_create(ctx, "Dynamic Playlist 0".into(), query)
811 .await?
812 .unwrap();
813
814 let ctx = tarpc::context::current();
815 let response = client
816 .dynamic_playlist_get_songs(ctx, dynamic_playlist_id)
817 .await?
818 .unwrap();
819
820 assert_eq!(response.len(), 1);
821
822 Ok(())
823 }
824}