1use std::{
3 net::{IpAddr, Ipv4Addr},
4 sync::Arc,
5};
6use futures::{future, prelude::*};
8use log::info;
9use surrealdb::{engine::local::Db, Surreal};
10use tarpc::{
11 self,
12 server::{incoming::Incoming as _, BaseChannel, Channel as _},
13 tokio_serde::formats::Json,
14};
15use mecomp_core::{
17 audio::AudioKernelSender,
18 is_server_running,
19 logger::{init_logger, init_tracing},
20 rpc::{MusicPlayer as _, MusicPlayerClient},
21 udp::{Message, Sender},
22};
23use mecomp_storage::db::{init_database, set_database_path};
24use tokio::sync::RwLock;
25
26async fn spawn(fut: impl Future<Output = ()> + Send + 'static) {
27 tokio::spawn(fut);
28}
29
30pub mod config;
31pub mod controller;
32#[cfg(feature = "dynamic_updates")]
33pub mod dynamic_updates;
34pub mod services;
35#[cfg(test)]
36pub use mecomp_core::test_utils;
37
38use crate::config::Settings;
39use crate::controller::MusicPlayerServer;
40
41pub async fn start_daemon(
62 settings: Settings,
63 db_dir: std::path::PathBuf,
64 log_file_path: Option<std::path::PathBuf>,
65) -> anyhow::Result<()> {
66 let settings = Arc::new(settings);
68
69 if is_server_running(settings.daemon.rpc_port) {
71 anyhow::bail!(
72 "A server is already running on port {}",
73 settings.daemon.rpc_port
74 );
75 }
76
77 init_logger(settings.daemon.log_level, log_file_path);
79 set_database_path(db_dir)?;
80 let db = Arc::new(init_database().await?);
81 tracing::subscriber::set_global_default(init_tracing())?;
82
83 #[cfg(feature = "dynamic_updates")]
85 let guard = dynamic_updates::init_music_library_watcher(
86 db.clone(),
87 &settings.daemon.library_paths,
88 settings.daemon.artist_separator.clone(),
89 settings.daemon.genre_separator.clone(),
90 )?;
91
92 let (event_tx, event_rx) = std::sync::mpsc::channel();
94 let audio_kernel = AudioKernelSender::start(event_tx);
95 let event_publisher = Arc::new(RwLock::new(Sender::new().await?));
96 let server = MusicPlayerServer::new(
97 db.clone(),
98 settings.clone(),
99 audio_kernel.clone(),
100 event_publisher.clone(),
101 );
102
103 let eft_guard = tokio::spawn(async move {
107 while let Ok(event) = event_rx.recv() {
108 event_publisher
109 .read()
110 .await
111 .send(Message::StateChange(event))
112 .await
113 .unwrap();
114 }
115 });
116
117 let server_addr = (IpAddr::V4(Ipv4Addr::LOCALHOST), settings.daemon.rpc_port);
121
122 let mut listener = tarpc::serde_transport::tcp::listen(&server_addr, Json::default).await?;
123 info!("Listening on {}", listener.local_addr());
124 listener.config_mut().max_frame_length(usize::MAX);
125 listener
126 .filter_map(|r| future::ready(r.ok()))
128 .map(BaseChannel::with_defaults)
129 .max_channels_per_key(10, |t| t.transport().peer_addr().unwrap().ip())
131 .map(|channel| channel.execute(server.clone().serve()).for_each(spawn))
135 .buffer_unordered(10)
140 .for_each(|()| async {})
141 .await;
142
143 #[cfg(feature = "dynamic_updates")]
144 guard.stop();
145
146 eft_guard.abort();
147
148 Ok(())
149}
150
151pub async fn init_test_client_server(
158 db: Arc<Surreal<Db>>,
159 settings: Arc<Settings>,
160 audio_kernel: Arc<AudioKernelSender>,
161) -> anyhow::Result<MusicPlayerClient> {
162 let (client_transport, server_transport) = tarpc::transport::channel::unbounded();
163
164 let event_publisher = Arc::new(RwLock::new(Sender::new().await?));
165 let server = MusicPlayerServer::new(db, settings, audio_kernel, event_publisher);
166 tokio::spawn(
167 tarpc::server::BaseChannel::with_defaults(server_transport)
168 .execute(server.serve())
169 .for_each(|response| async move {
171 tokio::spawn(response);
172 }),
173 );
174
175 Ok(MusicPlayerClient::new(tarpc::client::Config::default(), client_transport).spawn())
178}
179
180#[cfg(test)]
181mod test_client_tests {
182 use super::*;
187 use anyhow::Result;
188 use mecomp_core::state::library::LibraryFull;
189 use mecomp_storage::{
190 db::schemas::{
191 collection::Collection,
192 dynamic::{query::Query, DynamicPlaylist, DynamicPlaylistChangeSet},
193 playlist::Playlist,
194 song::SongChangeSet,
195 },
196 test_utils::{create_song_with_overrides, init_test_database, SongCase},
197 };
198
199 use pretty_assertions::assert_eq;
200 use rstest::{fixture, rstest};
201
202 #[fixture]
203 async fn db() -> Arc<Surreal<Db>> {
204 let db = Arc::new(init_test_database().await.unwrap());
205
206 let song_case = SongCase::new(0, vec![0], vec![0], 0, 0);
209
210 let song = create_song_with_overrides(
212 &db,
213 song_case,
214 SongChangeSet {
215 artist: Some(one_or_many::OneOrMany::One("Artist 0".into())),
217 album_artist: Some(one_or_many::OneOrMany::One("Artist 0".into())),
218 album: Some("Album 0".into()),
219 ..Default::default()
220 },
221 )
222 .await
223 .unwrap();
224
225 let playlist = Playlist {
227 id: Playlist::generate_id(),
228 name: "Playlist 0".into(),
229 runtime: song.runtime,
230 song_count: 1,
231 };
232
233 let result = Playlist::create(&db, playlist).await.unwrap().unwrap();
234
235 Playlist::add_songs(&db, result.id, vec![song.id.clone()])
236 .await
237 .unwrap();
238
239 let collection = Collection {
241 id: Collection::generate_id(),
242 name: "Collection 0".into(),
243 runtime: song.runtime,
244 song_count: 1,
245 };
246
247 let result = Collection::create(&db, collection).await.unwrap().unwrap();
248
249 Collection::add_songs(&db, result.id, vec![song.id])
250 .await
251 .unwrap();
252
253 return db;
254 }
255
256 #[fixture]
257 async fn client(#[future] db: Arc<Surreal<Db>>) -> MusicPlayerClient {
258 let settings = Arc::new(Settings::default());
259 let (tx, _) = std::sync::mpsc::channel();
260 let audio_kernel = AudioKernelSender::start(tx);
261
262 init_test_client_server(db.await, settings, audio_kernel)
263 .await
264 .unwrap()
265 }
266
267 #[tokio::test]
268 async fn test_init_test_client_server() {
269 let db = Arc::new(init_test_database().await.unwrap());
270 let settings = Arc::new(Settings::default());
271 let (tx, _) = std::sync::mpsc::channel();
272 let audio_kernel = AudioKernelSender::start(tx);
273
274 let client = init_test_client_server(db, settings, audio_kernel)
275 .await
276 .unwrap();
277
278 let ctx = tarpc::context::current();
279 let response = client.ping(ctx).await.unwrap();
280
281 assert_eq!(response, "pong");
282
283 drop(client);
285 }
286
287 #[rstest]
288 #[tokio::test]
289 async fn test_library_song_get_artist(#[future] client: MusicPlayerClient) -> Result<()> {
290 let client = client.await;
291
292 let ctx = tarpc::context::current();
293 let library_full: LibraryFull = client.library_full(ctx).await??;
294
295 let ctx = tarpc::context::current();
296 let response = client
297 .library_song_get_artist(ctx, library_full.songs.first().unwrap().id.clone().into())
298 .await?;
299
300 assert_eq!(response, library_full.artists.into_vec().into());
301
302 Ok(())
303 }
304
305 #[rstest]
306 #[tokio::test]
307 async fn test_library_song_get_album(#[future] client: MusicPlayerClient) -> Result<()> {
308 let client = client.await;
309
310 let ctx = tarpc::context::current();
311 let library_full: LibraryFull = client.library_full(ctx).await??;
312
313 let ctx = tarpc::context::current();
314 let response = client
315 .library_song_get_album(ctx, library_full.songs.first().unwrap().id.clone().into())
316 .await?
317 .unwrap();
318
319 assert_eq!(response, library_full.albums.first().unwrap().clone());
320
321 Ok(())
322 }
323
324 #[rstest]
325 #[tokio::test]
326 async fn test_library_song_get_playlists(#[future] client: MusicPlayerClient) -> Result<()> {
327 let client = client.await;
328
329 let ctx = tarpc::context::current();
330 let library_full: LibraryFull = client.library_full(ctx).await??;
331
332 let ctx = tarpc::context::current();
333 let response = client
334 .library_song_get_playlists(ctx, library_full.songs.first().unwrap().id.clone().into())
335 .await?;
336
337 assert_eq!(response, library_full.playlists.into_vec().into());
338
339 Ok(())
340 }
341
342 #[rstest]
343 #[tokio::test]
344 async fn test_library_album_get_artist(#[future] client: MusicPlayerClient) -> Result<()> {
345 let client = client.await;
346
347 let ctx = tarpc::context::current();
348 let library_full: LibraryFull = client.library_full(ctx).await??;
349
350 let ctx = tarpc::context::current();
351 let response = client
352 .library_album_get_artist(ctx, library_full.albums.first().unwrap().id.clone().into())
353 .await?;
354
355 assert_eq!(response, library_full.artists.into_vec().into());
356
357 Ok(())
358 }
359
360 #[rstest]
361 #[tokio::test]
362 async fn test_library_album_get_songs(#[future] client: MusicPlayerClient) -> Result<()> {
363 let client = client.await;
364
365 let ctx = tarpc::context::current();
366 let library_full: LibraryFull = client.library_full(ctx).await??;
367
368 let ctx = tarpc::context::current();
369 let response = client
370 .library_album_get_songs(ctx, library_full.albums.first().unwrap().id.clone().into())
371 .await?
372 .unwrap();
373
374 assert_eq!(response, library_full.songs);
375
376 Ok(())
377 }
378
379 #[rstest]
380 #[tokio::test]
381 async fn test_library_artist_get_songs(#[future] client: MusicPlayerClient) -> Result<()> {
382 let client = client.await;
383
384 let ctx = tarpc::context::current();
385 let library_full: LibraryFull = client.library_full(ctx).await??;
386
387 let ctx = tarpc::context::current();
388 let response = client
389 .library_artist_get_songs(ctx, library_full.artists.first().unwrap().id.clone().into())
390 .await?
391 .unwrap();
392
393 assert_eq!(response, library_full.songs);
394
395 Ok(())
396 }
397
398 #[rstest]
399 #[tokio::test]
400 async fn test_library_artist_get_albums(#[future] client: MusicPlayerClient) -> Result<()> {
401 let client = client.await;
402
403 let ctx = tarpc::context::current();
404 let library_full: LibraryFull = client.library_full(ctx).await??;
405
406 let ctx = tarpc::context::current();
407 let response = client
408 .library_artist_get_albums(ctx, library_full.artists.first().unwrap().id.clone().into())
409 .await?
410 .unwrap();
411
412 assert_eq!(response, library_full.albums);
413
414 Ok(())
415 }
416
417 #[rstest]
418 #[tokio::test]
419 async fn test_playback_volume_toggle_mute(#[future] client: MusicPlayerClient) -> Result<()> {
420 let client = client.await;
421
422 let ctx = tarpc::context::current();
423
424 client.playback_volume_toggle_mute(ctx).await?;
425 Ok(())
426 }
427
428 #[rstest]
429 #[tokio::test]
430 async fn test_playback_stop(#[future] client: MusicPlayerClient) -> Result<()> {
431 let client = client.await;
432
433 let ctx = tarpc::context::current();
434
435 client.playback_stop(ctx).await?;
436 Ok(())
437 }
438
439 #[rstest]
440 #[tokio::test]
441 async fn test_queue_add_list(#[future] client: MusicPlayerClient) -> Result<()> {
442 let client = client.await;
443
444 let ctx = tarpc::context::current();
445 let library_full: LibraryFull = client.library_full(ctx).await??;
446
447 let ctx = tarpc::context::current();
448 let response = client
449 .queue_add_list(
450 ctx,
451 vec![library_full.songs.first().unwrap().id.clone().into()],
452 )
453 .await?;
454
455 assert_eq!(response, Ok(()));
456
457 Ok(())
458 }
459
460 #[rstest]
461 #[case::get(String::from("Playlist 0"))]
462 #[case::create(String::from("Playlist 1"))]
463 #[tokio::test]
464 async fn test_playlist_get_or_create(
465 #[future] client: MusicPlayerClient,
466 #[case] name: String,
467 ) -> Result<()> {
468 let client = client.await;
469
470 let ctx = tarpc::context::current();
471
472 let playlist_id = client
474 .playlist_get_or_create(ctx, name.clone())
475 .await?
476 .unwrap();
477
478 let ctx = tarpc::context::current();
480 let playlist = client.playlist_get(ctx, playlist_id).await?.unwrap();
481
482 assert_eq!(playlist.name, name.into());
483
484 Ok(())
485 }
486
487 #[rstest]
488 #[tokio::test]
489 async fn test_playlist_clone(#[future] client: MusicPlayerClient) -> Result<()> {
490 let client = client.await;
491
492 let ctx = tarpc::context::current();
493 let library_full: LibraryFull = client.library_full(ctx).await??;
494
495 let ctx = tarpc::context::current();
497 let playlist_id = client
498 .playlist_clone(
499 ctx,
500 library_full.playlists.first().unwrap().id.clone().into(),
501 )
502 .await?
503 .unwrap();
504
505 let ctx = tarpc::context::current();
507 let playlist = client.playlist_get(ctx, playlist_id).await?.unwrap();
508
509 assert_eq!(playlist.name, "Playlist 0 (copy)".into());
510
511 Ok(())
512 }
513
514 #[rstest]
515 #[tokio::test]
516 async fn test_playlist_get_songs(#[future] client: MusicPlayerClient) -> Result<()> {
517 let client = client.await;
518
519 let ctx = tarpc::context::current();
520 let library_full: LibraryFull = client.library_full(ctx).await??;
521
522 let response = client
524 .playlist_get_songs(
525 ctx,
526 library_full.playlists.first().unwrap().id.clone().into(),
527 )
528 .await?
529 .unwrap();
530
531 assert_eq!(response, library_full.songs);
532
533 Ok(())
534 }
535
536 #[rstest]
537 #[tokio::test]
538 async fn test_playlist_rename(#[future] client: MusicPlayerClient) -> Result<()> {
539 let client = client.await;
540
541 let ctx = tarpc::context::current();
542 let library_full: LibraryFull = client.library_full(ctx).await??;
543
544 let target = library_full.playlists.first().unwrap();
545
546 let ctx = tarpc::context::current();
547 let response = client
548 .playlist_rename(ctx, target.id.clone().into(), "New Name".into())
549 .await?;
550
551 let expected = Playlist {
552 name: "New Name".into(),
553 ..target.clone()
554 };
555
556 assert_eq!(response, Ok(expected.clone()));
557
558 let ctx = tarpc::context::current();
559 let response = client
560 .playlist_get(ctx, target.id.clone().into())
561 .await?
562 .unwrap();
563
564 assert_eq!(response, expected);
565 Ok(())
566 }
567
568 #[rstest]
569 #[tokio::test]
570 async fn test_collection_get_songs(#[future] client: MusicPlayerClient) -> Result<()> {
571 let client = client.await;
572
573 let ctx = tarpc::context::current();
574 let library_full: LibraryFull = client.library_full(ctx).await??;
575
576 let response = client
578 .collection_get_songs(
579 ctx,
580 library_full.collections.first().unwrap().id.clone().into(),
581 )
582 .await?
583 .unwrap();
584
585 assert_eq!(response, library_full.songs);
586
587 Ok(())
588 }
589
590 #[rstest]
591 #[tokio::test]
592 async fn test_dynamic_playlist_create(#[future] client: MusicPlayerClient) -> Result<()> {
593 let client = client.await;
594
595 let ctx = tarpc::context::current();
596
597 let query: Query = "artist CONTAINS \"Artist 0\"".parse()?;
598
599 let response = client
600 .dynamic_playlist_create(ctx, "Dynamic Playlist 0".into(), query)
601 .await?;
602
603 assert!(response.is_ok());
604
605 Ok(())
606 }
607
608 #[rstest]
609 #[tokio::test]
610 async fn test_dynamic_playlist_list(#[future] client: MusicPlayerClient) -> Result<()> {
611 let client = client.await;
612
613 let ctx = tarpc::context::current();
614
615 let query: Query = "artist CONTAINS \"Artist 0\"".parse()?;
616
617 let dynamic_playlist_id = client
618 .dynamic_playlist_create(ctx, "Dynamic Playlist 0".into(), query)
619 .await?
620 .unwrap();
621
622 let ctx = tarpc::context::current();
623 let response = client.dynamic_playlist_list(ctx).await?;
624
625 assert_eq!(response.len(), 1);
626 assert_eq!(response.first().unwrap().id, dynamic_playlist_id.into());
627
628 Ok(())
629 }
630
631 #[rstest]
632 #[tokio::test]
633 async fn test_dynamic_playlist_update(#[future] client: MusicPlayerClient) -> Result<()> {
634 let client = client.await;
635
636 let ctx = tarpc::context::current();
637
638 let query: Query = "artist CONTAINS \"Artist 0\"".parse()?;
639
640 let dynamic_playlist_id = client
641 .dynamic_playlist_create(ctx, "Dynamic Playlist 0".into(), query.clone())
642 .await?
643 .unwrap();
644
645 let ctx = tarpc::context::current();
646 let response = client
647 .dynamic_playlist_update(
648 ctx,
649 dynamic_playlist_id.clone(),
650 DynamicPlaylistChangeSet::new().name("Dynamic Playlist 1"),
651 )
652 .await?;
653
654 let expected = DynamicPlaylist {
655 id: dynamic_playlist_id.clone().into(),
656 name: "Dynamic Playlist 1".into(),
657 query: query.clone(),
658 };
659
660 assert_eq!(response, Ok(expected.clone()));
661
662 let ctx = tarpc::context::current();
663 let response = client
664 .dynamic_playlist_get(ctx, dynamic_playlist_id)
665 .await?
666 .unwrap();
667
668 assert_eq!(response, expected);
669
670 Ok(())
671 }
672
673 #[rstest]
674 #[tokio::test]
675 async fn test_dynamic_playlist_remove(#[future] client: MusicPlayerClient) -> Result<()> {
676 let client = client.await;
677
678 let ctx = tarpc::context::current();
679
680 let query: Query = "artist CONTAINS \"Artist 0\"".parse()?;
681
682 let dynamic_playlist_id = client
683 .dynamic_playlist_create(ctx, "Dynamic Playlist 0".into(), query)
684 .await?
685 .unwrap();
686
687 let ctx = tarpc::context::current();
688 let response = client
689 .dynamic_playlist_remove(ctx, dynamic_playlist_id)
690 .await?;
691
692 assert_eq!(response, Ok(()));
693
694 let ctx = tarpc::context::current();
695 let response = client.dynamic_playlist_list(ctx).await?;
696
697 assert_eq!(response.len(), 0);
698
699 Ok(())
700 }
701
702 #[rstest]
703 #[tokio::test]
704 async fn test_dynamic_playlist_get(#[future] client: MusicPlayerClient) -> Result<()> {
705 let client = client.await;
706
707 let ctx = tarpc::context::current();
708
709 let query: Query = "artist CONTAINS \"Artist 0\"".parse()?;
710
711 let dynamic_playlist_id = client
712 .dynamic_playlist_create(ctx, "Dynamic Playlist 0".into(), query.clone())
713 .await?
714 .unwrap();
715
716 let ctx = tarpc::context::current();
717 let response = client
718 .dynamic_playlist_get(ctx, dynamic_playlist_id)
719 .await?
720 .unwrap();
721
722 assert_eq!(response.name, "Dynamic Playlist 0".into());
723 assert_eq!(response.query, query);
724
725 Ok(())
726 }
727
728 #[rstest]
729 #[tokio::test]
730 async fn test_dynamic_playlist_get_songs(#[future] client: MusicPlayerClient) -> Result<()> {
731 let client = client.await;
732
733 let ctx = tarpc::context::current();
734
735 let query: Query = "artist CONTAINS \"Artist 0\"".parse()?;
736
737 let dynamic_playlist_id = client
738 .dynamic_playlist_create(ctx, "Dynamic Playlist 0".into(), query)
739 .await?
740 .unwrap();
741
742 let ctx = tarpc::context::current();
743 let response = client
744 .dynamic_playlist_get_songs(ctx, dynamic_playlist_id)
745 .await?
746 .unwrap();
747
748 assert_eq!(response.len(), 1);
749
750 Ok(())
751 }
752}