1#![deny(clippy::missing_inline_in_public_items)]
2
3use std::{
5 net::{IpAddr, Ipv4Addr},
6 sync::Arc,
7};
8use futures::{future, prelude::*};
10use log::info;
11use surrealdb::{engine::local::Db, Surreal};
12use tarpc::{
13 self,
14 server::{incoming::Incoming as _, BaseChannel, Channel as _},
15 tokio_serde::formats::Json,
16};
17use mecomp_core::{
19 audio::AudioKernelSender,
20 config::Settings,
21 is_server_running,
22 logger::{init_logger, init_tracing},
23 rpc::{MusicPlayer as _, MusicPlayerClient},
24 udp::{Message, Sender},
25};
26use mecomp_storage::db::{init_database, set_database_path};
27use tokio::sync::RwLock;
28
29async fn spawn(fut: impl Future<Output = ()> + Send + 'static) {
30 tokio::spawn(fut);
31}
32
33pub mod controller;
34#[cfg(feature = "dynamic_updates")]
35pub mod dynamic_updates;
36pub mod services;
37#[cfg(test)]
38pub use mecomp_core::test_utils;
39
40use crate::controller::MusicPlayerServer;
41
42#[inline]
63pub async fn start_daemon(
64 settings: Settings,
65 db_dir: std::path::PathBuf,
66 log_file_path: Option<std::path::PathBuf>,
67) -> anyhow::Result<()> {
68 let settings = Arc::new(settings);
70
71 if is_server_running(settings.daemon.rpc_port) {
73 anyhow::bail!(
74 "A server is already running on port {}",
75 settings.daemon.rpc_port
76 );
77 }
78
79 init_logger(settings.daemon.log_level, log_file_path);
81 set_database_path(db_dir)?;
82 let db = Arc::new(init_database().await?);
83 tracing::subscriber::set_global_default(init_tracing())?;
84
85 #[cfg(feature = "dynamic_updates")]
87 let guard = dynamic_updates::init_music_library_watcher(
88 db.clone(),
89 &settings.daemon.library_paths,
90 settings.daemon.artist_separator.clone(),
91 settings.daemon.genre_separator.clone(),
92 )?;
93
94 let (event_tx, event_rx) = std::sync::mpsc::channel();
96 let audio_kernel = AudioKernelSender::start(event_tx);
97 let event_publisher = Arc::new(RwLock::new(Sender::new().await?));
98 let server = MusicPlayerServer::new(
99 db.clone(),
100 settings.clone(),
101 audio_kernel.clone(),
102 event_publisher.clone(),
103 );
104
105 let eft_guard = tokio::spawn(async move {
109 while let Ok(event) = event_rx.recv() {
110 event_publisher
111 .read()
112 .await
113 .send(Message::StateChange(event))
114 .await
115 .unwrap();
116 }
117 });
118
119 let server_addr = (IpAddr::V4(Ipv4Addr::LOCALHOST), settings.daemon.rpc_port);
123
124 let mut listener = tarpc::serde_transport::tcp::listen(&server_addr, Json::default).await?;
125 info!("Listening on {}", listener.local_addr());
126 listener.config_mut().max_frame_length(usize::MAX);
127 listener
128 .filter_map(|r| future::ready(r.ok()))
130 .map(BaseChannel::with_defaults)
131 .max_channels_per_key(10, |t| t.transport().peer_addr().unwrap().ip())
133 .map(|channel| channel.execute(server.clone().serve()).for_each(spawn))
137 .buffer_unordered(10)
142 .for_each(|()| async {})
143 .await;
144
145 #[cfg(feature = "dynamic_updates")]
146 guard.stop();
147
148 eft_guard.abort();
149
150 Ok(())
151}
152
153#[inline]
160pub async fn init_test_client_server(
161 db: Arc<Surreal<Db>>,
162 settings: Arc<Settings>,
163 audio_kernel: Arc<AudioKernelSender>,
164) -> anyhow::Result<MusicPlayerClient> {
165 let (client_transport, server_transport) = tarpc::transport::channel::unbounded();
166
167 let event_publisher = Arc::new(RwLock::new(Sender::new().await?));
168 let server = MusicPlayerServer::new(db, settings, audio_kernel, event_publisher);
169 tokio::spawn(
170 tarpc::server::BaseChannel::with_defaults(server_transport)
171 .execute(server.serve())
172 .for_each(|response| async move {
174 tokio::spawn(response);
175 }),
176 );
177
178 Ok(MusicPlayerClient::new(tarpc::client::Config::default(), client_transport).spawn())
181}
182
183#[cfg(test)]
184mod test_client_tests {
185 use super::*;
190 use anyhow::Result;
191 use mecomp_core::state::library::LibraryFull;
192 use mecomp_storage::{
193 db::schemas::{
194 collection::Collection,
195 dynamic::{query::Query, DynamicPlaylist, DynamicPlaylistChangeSet},
196 playlist::Playlist,
197 song::SongChangeSet,
198 },
199 test_utils::{create_song_with_overrides, init_test_database, SongCase},
200 };
201
202 use pretty_assertions::assert_eq;
203 use rstest::{fixture, rstest};
204
205 #[fixture]
206 async fn db() -> Arc<Surreal<Db>> {
207 let db = Arc::new(init_test_database().await.unwrap());
208
209 let song_case = SongCase::new(0, vec![0], vec![0], 0, 0);
212
213 let song = create_song_with_overrides(
215 &db,
216 song_case,
217 SongChangeSet {
218 artist: Some(one_or_many::OneOrMany::One("Artist 0".into())),
220 album_artist: Some(one_or_many::OneOrMany::One("Artist 0".into())),
221 album: Some("Album 0".into()),
222 ..Default::default()
223 },
224 )
225 .await
226 .unwrap();
227
228 let playlist = Playlist {
230 id: Playlist::generate_id(),
231 name: "Playlist 0".into(),
232 runtime: song.runtime,
233 song_count: 1,
234 };
235
236 let result = Playlist::create(&db, playlist).await.unwrap().unwrap();
237
238 Playlist::add_songs(&db, result.id, vec![song.id.clone()])
239 .await
240 .unwrap();
241
242 let collection = Collection {
244 id: Collection::generate_id(),
245 name: "Collection 0".into(),
246 runtime: song.runtime,
247 song_count: 1,
248 };
249
250 let result = Collection::create(&db, collection).await.unwrap().unwrap();
251
252 Collection::add_songs(&db, result.id, vec![song.id])
253 .await
254 .unwrap();
255
256 return db;
257 }
258
259 #[fixture]
260 async fn client(#[future] db: Arc<Surreal<Db>>) -> MusicPlayerClient {
261 let settings = Arc::new(Settings::default());
262 let (tx, _) = std::sync::mpsc::channel();
263 let audio_kernel = AudioKernelSender::start(tx);
264
265 init_test_client_server(db.await, settings, audio_kernel)
266 .await
267 .unwrap()
268 }
269
270 #[tokio::test]
271 async fn test_init_test_client_server() {
272 let db = Arc::new(init_test_database().await.unwrap());
273 let settings = Arc::new(Settings::default());
274 let (tx, _) = std::sync::mpsc::channel();
275 let audio_kernel = AudioKernelSender::start(tx);
276
277 let client = init_test_client_server(db, settings, audio_kernel)
278 .await
279 .unwrap();
280
281 let ctx = tarpc::context::current();
282 let response = client.ping(ctx).await.unwrap();
283
284 assert_eq!(response, "pong");
285
286 drop(client);
288 }
289
290 #[rstest]
291 #[tokio::test]
292 async fn test_library_song_get_artist(#[future] client: MusicPlayerClient) -> Result<()> {
293 let client = client.await;
294
295 let ctx = tarpc::context::current();
296 let library_full: LibraryFull = client.library_full(ctx).await??;
297
298 let ctx = tarpc::context::current();
299 let response = client
300 .library_song_get_artist(ctx, library_full.songs.first().unwrap().id.clone().into())
301 .await?;
302
303 assert_eq!(response, library_full.artists.into_vec().into());
304
305 Ok(())
306 }
307
308 #[rstest]
309 #[tokio::test]
310 async fn test_library_song_get_album(#[future] client: MusicPlayerClient) -> Result<()> {
311 let client = client.await;
312
313 let ctx = tarpc::context::current();
314 let library_full: LibraryFull = client.library_full(ctx).await??;
315
316 let ctx = tarpc::context::current();
317 let response = client
318 .library_song_get_album(ctx, library_full.songs.first().unwrap().id.clone().into())
319 .await?
320 .unwrap();
321
322 assert_eq!(response, library_full.albums.first().unwrap().clone());
323
324 Ok(())
325 }
326
327 #[rstest]
328 #[tokio::test]
329 async fn test_library_song_get_playlists(#[future] client: MusicPlayerClient) -> Result<()> {
330 let client = client.await;
331
332 let ctx = tarpc::context::current();
333 let library_full: LibraryFull = client.library_full(ctx).await??;
334
335 let ctx = tarpc::context::current();
336 let response = client
337 .library_song_get_playlists(ctx, library_full.songs.first().unwrap().id.clone().into())
338 .await?;
339
340 assert_eq!(response, library_full.playlists.into_vec().into());
341
342 Ok(())
343 }
344
345 #[rstest]
346 #[tokio::test]
347 async fn test_library_album_get_artist(#[future] client: MusicPlayerClient) -> Result<()> {
348 let client = client.await;
349
350 let ctx = tarpc::context::current();
351 let library_full: LibraryFull = client.library_full(ctx).await??;
352
353 let ctx = tarpc::context::current();
354 let response = client
355 .library_album_get_artist(ctx, library_full.albums.first().unwrap().id.clone().into())
356 .await?;
357
358 assert_eq!(response, library_full.artists.into_vec().into());
359
360 Ok(())
361 }
362
363 #[rstest]
364 #[tokio::test]
365 async fn test_library_album_get_songs(#[future] client: MusicPlayerClient) -> Result<()> {
366 let client = client.await;
367
368 let ctx = tarpc::context::current();
369 let library_full: LibraryFull = client.library_full(ctx).await??;
370
371 let ctx = tarpc::context::current();
372 let response = client
373 .library_album_get_songs(ctx, library_full.albums.first().unwrap().id.clone().into())
374 .await?
375 .unwrap();
376
377 assert_eq!(response, library_full.songs);
378
379 Ok(())
380 }
381
382 #[rstest]
383 #[tokio::test]
384 async fn test_library_artist_get_songs(#[future] client: MusicPlayerClient) -> Result<()> {
385 let client = client.await;
386
387 let ctx = tarpc::context::current();
388 let library_full: LibraryFull = client.library_full(ctx).await??;
389
390 let ctx = tarpc::context::current();
391 let response = client
392 .library_artist_get_songs(ctx, library_full.artists.first().unwrap().id.clone().into())
393 .await?
394 .unwrap();
395
396 assert_eq!(response, library_full.songs);
397
398 Ok(())
399 }
400
401 #[rstest]
402 #[tokio::test]
403 async fn test_library_artist_get_albums(#[future] client: MusicPlayerClient) -> Result<()> {
404 let client = client.await;
405
406 let ctx = tarpc::context::current();
407 let library_full: LibraryFull = client.library_full(ctx).await??;
408
409 let ctx = tarpc::context::current();
410 let response = client
411 .library_artist_get_albums(ctx, library_full.artists.first().unwrap().id.clone().into())
412 .await?
413 .unwrap();
414
415 assert_eq!(response, library_full.albums);
416
417 Ok(())
418 }
419
420 #[rstest]
421 #[tokio::test]
422 async fn test_playback_volume_toggle_mute(#[future] client: MusicPlayerClient) -> Result<()> {
423 let client = client.await;
424
425 let ctx = tarpc::context::current();
426
427 client.playback_volume_toggle_mute(ctx).await?;
428 Ok(())
429 }
430
431 #[rstest]
432 #[tokio::test]
433 async fn test_playback_stop(#[future] client: MusicPlayerClient) -> Result<()> {
434 let client = client.await;
435
436 let ctx = tarpc::context::current();
437
438 client.playback_stop(ctx).await?;
439 Ok(())
440 }
441
442 #[rstest]
443 #[tokio::test]
444 async fn test_queue_add_list(#[future] client: MusicPlayerClient) -> Result<()> {
445 let client = client.await;
446
447 let ctx = tarpc::context::current();
448 let library_full: LibraryFull = client.library_full(ctx).await??;
449
450 let ctx = tarpc::context::current();
451 let response = client
452 .queue_add_list(
453 ctx,
454 vec![library_full.songs.first().unwrap().id.clone().into()],
455 )
456 .await?;
457
458 assert_eq!(response, Ok(()));
459
460 Ok(())
461 }
462
463 #[rstest]
464 #[case::get(String::from("Playlist 0"))]
465 #[case::create(String::from("Playlist 1"))]
466 #[tokio::test]
467 async fn test_playlist_get_or_create(
468 #[future] client: MusicPlayerClient,
469 #[case] name: String,
470 ) -> Result<()> {
471 let client = client.await;
472
473 let ctx = tarpc::context::current();
474
475 let playlist_id = client
477 .playlist_get_or_create(ctx, name.clone())
478 .await?
479 .unwrap();
480
481 let ctx = tarpc::context::current();
483 let playlist = client.playlist_get(ctx, playlist_id).await?.unwrap();
484
485 assert_eq!(playlist.name, name);
486
487 Ok(())
488 }
489
490 #[rstest]
491 #[tokio::test]
492 async fn test_playlist_clone(#[future] client: MusicPlayerClient) -> Result<()> {
493 let client = client.await;
494
495 let ctx = tarpc::context::current();
496 let library_full: LibraryFull = client.library_full(ctx).await??;
497
498 let ctx = tarpc::context::current();
500 let playlist_id = client
501 .playlist_clone(
502 ctx,
503 library_full.playlists.first().unwrap().id.clone().into(),
504 )
505 .await?
506 .unwrap();
507
508 let ctx = tarpc::context::current();
510 let playlist = client.playlist_get(ctx, playlist_id).await?.unwrap();
511
512 assert_eq!(playlist.name, "Playlist 0 (copy)");
513
514 Ok(())
515 }
516
517 #[rstest]
518 #[tokio::test]
519 async fn test_playlist_get_songs(#[future] client: MusicPlayerClient) -> Result<()> {
520 let client = client.await;
521
522 let ctx = tarpc::context::current();
523 let library_full: LibraryFull = client.library_full(ctx).await??;
524
525 let response = client
527 .playlist_get_songs(
528 ctx,
529 library_full.playlists.first().unwrap().id.clone().into(),
530 )
531 .await?
532 .unwrap();
533
534 assert_eq!(response, library_full.songs);
535
536 Ok(())
537 }
538
539 #[rstest]
540 #[tokio::test]
541 async fn test_playlist_rename(#[future] client: MusicPlayerClient) -> Result<()> {
542 let client = client.await;
543
544 let ctx = tarpc::context::current();
545 let library_full: LibraryFull = client.library_full(ctx).await??;
546
547 let target = library_full.playlists.first().unwrap();
548
549 let ctx = tarpc::context::current();
550 let response = client
551 .playlist_rename(ctx, target.id.clone().into(), "New Name".into())
552 .await?;
553
554 let expected = Playlist {
555 name: "New Name".into(),
556 ..target.clone()
557 };
558
559 assert_eq!(response, Ok(expected.clone()));
560
561 let ctx = tarpc::context::current();
562 let response = client
563 .playlist_get(ctx, target.id.clone().into())
564 .await?
565 .unwrap();
566
567 assert_eq!(response, expected);
568 Ok(())
569 }
570
571 #[rstest]
572 #[tokio::test]
573 async fn test_collection_get_songs(#[future] client: MusicPlayerClient) -> Result<()> {
574 let client = client.await;
575
576 let ctx = tarpc::context::current();
577 let library_full: LibraryFull = client.library_full(ctx).await??;
578
579 let response = client
581 .collection_get_songs(
582 ctx,
583 library_full.collections.first().unwrap().id.clone().into(),
584 )
585 .await?
586 .unwrap();
587
588 assert_eq!(response, library_full.songs);
589
590 Ok(())
591 }
592
593 #[rstest]
594 #[tokio::test]
595 async fn test_dynamic_playlist_create(#[future] client: MusicPlayerClient) -> Result<()> {
596 let client = client.await;
597
598 let ctx = tarpc::context::current();
599
600 let query: Query = "artist CONTAINS \"Artist 0\"".parse()?;
601
602 let response = client
603 .dynamic_playlist_create(ctx, "Dynamic Playlist 0".into(), query)
604 .await?;
605
606 assert!(response.is_ok());
607
608 Ok(())
609 }
610
611 #[rstest]
612 #[tokio::test]
613 async fn test_dynamic_playlist_list(#[future] client: MusicPlayerClient) -> Result<()> {
614 let client = client.await;
615
616 let ctx = tarpc::context::current();
617
618 let query: Query = "artist CONTAINS \"Artist 0\"".parse()?;
619
620 let dynamic_playlist_id = client
621 .dynamic_playlist_create(ctx, "Dynamic Playlist 0".into(), query)
622 .await?
623 .unwrap();
624
625 let ctx = tarpc::context::current();
626 let response = client.dynamic_playlist_list(ctx).await?;
627
628 assert_eq!(response.len(), 1);
629 assert_eq!(response.first().unwrap().id, dynamic_playlist_id.into());
630
631 Ok(())
632 }
633
634 #[rstest]
635 #[tokio::test]
636 async fn test_dynamic_playlist_update(#[future] client: MusicPlayerClient) -> Result<()> {
637 let client = client.await;
638
639 let ctx = tarpc::context::current();
640
641 let query: Query = "artist CONTAINS \"Artist 0\"".parse()?;
642
643 let dynamic_playlist_id = client
644 .dynamic_playlist_create(ctx, "Dynamic Playlist 0".into(), query.clone())
645 .await?
646 .unwrap();
647
648 let ctx = tarpc::context::current();
649 let response = client
650 .dynamic_playlist_update(
651 ctx,
652 dynamic_playlist_id.clone(),
653 DynamicPlaylistChangeSet::new().name("Dynamic Playlist 1"),
654 )
655 .await?;
656
657 let expected = DynamicPlaylist {
658 id: dynamic_playlist_id.clone().into(),
659 name: "Dynamic Playlist 1".into(),
660 query: query.clone(),
661 };
662
663 assert_eq!(response, Ok(expected.clone()));
664
665 let ctx = tarpc::context::current();
666 let response = client
667 .dynamic_playlist_get(ctx, dynamic_playlist_id)
668 .await?
669 .unwrap();
670
671 assert_eq!(response, expected);
672
673 Ok(())
674 }
675
676 #[rstest]
677 #[tokio::test]
678 async fn test_dynamic_playlist_remove(#[future] client: MusicPlayerClient) -> Result<()> {
679 let client = client.await;
680
681 let ctx = tarpc::context::current();
682
683 let query: Query = "artist CONTAINS \"Artist 0\"".parse()?;
684
685 let dynamic_playlist_id = client
686 .dynamic_playlist_create(ctx, "Dynamic Playlist 0".into(), query)
687 .await?
688 .unwrap();
689
690 let ctx = tarpc::context::current();
691 let response = client
692 .dynamic_playlist_remove(ctx, dynamic_playlist_id)
693 .await?;
694
695 assert_eq!(response, Ok(()));
696
697 let ctx = tarpc::context::current();
698 let response = client.dynamic_playlist_list(ctx).await?;
699
700 assert_eq!(response.len(), 0);
701
702 Ok(())
703 }
704
705 #[rstest]
706 #[tokio::test]
707 async fn test_dynamic_playlist_get(#[future] client: MusicPlayerClient) -> Result<()> {
708 let client = client.await;
709
710 let ctx = tarpc::context::current();
711
712 let query: Query = "artist CONTAINS \"Artist 0\"".parse()?;
713
714 let dynamic_playlist_id = client
715 .dynamic_playlist_create(ctx, "Dynamic Playlist 0".into(), query.clone())
716 .await?
717 .unwrap();
718
719 let ctx = tarpc::context::current();
720 let response = client
721 .dynamic_playlist_get(ctx, dynamic_playlist_id)
722 .await?
723 .unwrap();
724
725 assert_eq!(response.name, "Dynamic Playlist 0");
726 assert_eq!(response.query, query);
727
728 Ok(())
729 }
730
731 #[rstest]
732 #[tokio::test]
733 async fn test_dynamic_playlist_get_songs(#[future] client: MusicPlayerClient) -> Result<()> {
734 let client = client.await;
735
736 let ctx = tarpc::context::current();
737
738 let query: Query = "artist CONTAINS \"Artist 0\"".parse()?;
739
740 let dynamic_playlist_id = client
741 .dynamic_playlist_create(ctx, "Dynamic Playlist 0".into(), query)
742 .await?
743 .unwrap();
744
745 let ctx = tarpc::context::current();
746 let response = client
747 .dynamic_playlist_get_songs(ctx, dynamic_playlist_id)
748 .await?
749 .unwrap();
750
751 assert_eq!(response.len(), 1);
752
753 Ok(())
754 }
755}