1use std::sync::Arc;
2use std::time::Duration;
3
4use anyhow::{Context, Result};
5use async_trait::async_trait;
6use parking_lot::RwLock;
7pub use playlist::Playlist;
8use termusiclib::config::SharedServerSettings;
9use termusiclib::config::v2::server::config_extra::ServerConfigVersionedDefaulted;
10use termusiclib::new_database::{Database, track_ops};
11use termusiclib::player::playlist_helpers::{
12 PlaylistAddTrack, PlaylistPlaySpecific, PlaylistRemoveTrackIndexed, PlaylistSwapTrack,
13};
14use termusiclib::player::{
15 PlayerProgress, PlayerTimeUnit, RunningStatus, TrackChangedInfo, UpdateEvents,
16};
17use termusiclib::podcast::db::Database as DBPod;
18use termusiclib::track::{MediaTypes, Track};
19use termusiclib::utils::get_app_config_path;
20use tokio::runtime::Handle;
21use tokio::sync::mpsc::error::SendError;
22use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
23use tokio::sync::{broadcast, oneshot};
24
25pub use backends::{Backend, BackendSelect};
26
27mod discord;
28mod mpris;
29pub mod playlist;
30
31#[macro_use]
32extern crate log;
33
34mod backends;
35
36pub mod __bench {
40 pub use super::backends::rusty::source::async_ring;
41}
42
43pub type PlayerCmdCallback = oneshot::Receiver<()>;
44pub type PlayerCmdReciever = UnboundedReceiver<(PlayerCmd, PlayerCmdCallbackSender)>;
45
46#[derive(Debug)]
48pub struct PlayerCmdCallbackSender(Option<oneshot::Sender<()>>);
49
50impl PlayerCmdCallbackSender {
51 pub fn call(self) {
53 let Some(sender) = self.0 else {
54 return;
55 };
56 let _ = sender.send(());
57 }
58}
59
60#[derive(Debug, Clone)]
62pub struct PlayerCmdSender(UnboundedSender<(PlayerCmd, PlayerCmdCallbackSender)>);
63
64impl PlayerCmdSender {
65 pub fn send(
70 &self,
71 cmd: PlayerCmd,
72 ) -> Result<(), SendError<(PlayerCmd, PlayerCmdCallbackSender)>> {
73 self.0.send((cmd, PlayerCmdCallbackSender(None)))
74 }
75
76 pub fn send_cb(
81 &self,
82 cmd: PlayerCmd,
83 ) -> Result<PlayerCmdCallback, SendError<(PlayerCmd, PlayerCmdCallbackSender)>> {
84 let (tx, rx) = oneshot::channel();
85 self.0.send((cmd, PlayerCmdCallbackSender(Some(tx))))?;
86 Ok(rx)
87 }
88
89 #[must_use]
90 pub fn new(tx: UnboundedSender<(PlayerCmd, PlayerCmdCallbackSender)>) -> Self {
91 Self(tx)
92 }
93}
94
95#[derive(Clone, Debug, Copy, PartialEq)]
96pub enum PlayerErrorType {
97 Current,
99 Enqueue,
101}
102
103#[derive(Clone, Debug)]
104pub enum PlayerCmd {
105 AboutToFinish,
108 Eos,
110 MetadataChanged,
112 Error(PlayerErrorType),
117
118 Tick,
120
121 CycleLoop,
123 GetProgress,
124 SkipPrevious,
125 Pause,
126 Play,
127 Quit,
128 ReloadConfig,
129 ReloadPlaylist,
130 SeekBackward,
131 SeekForward,
132 SkipNext,
133 SpeedDown,
134 SpeedUp,
135 ToggleGapless,
136 TogglePause,
137 VolumeDown,
138 VolumeUp,
139 VolumeSet(Volume),
140
141 PlaylistPlaySpecific(PlaylistPlaySpecific),
142 PlaylistAddTrack(PlaylistAddTrack),
143 PlaylistRemoveTrack(PlaylistRemoveTrackIndexed),
144 PlaylistClear,
145 PlaylistSwapTrack(PlaylistSwapTrack),
146 PlaylistShuffle,
147 PlaylistRemoveDeletedTracks,
148}
149
150pub type StreamTX = broadcast::Sender<UpdateEvents>;
151pub type SharedPlaylist = Arc<RwLock<Playlist>>;
152
153#[allow(clippy::module_name_repetitions)]
154pub struct GeneralPlayer {
155 pub backend: Backend,
156 pub playlist: SharedPlaylist,
157 pub config: SharedServerSettings,
158 pub current_track_updated: bool,
159 pub mpris: Option<mpris::Mpris>,
160 pub discord: Option<discord::Rpc>,
161 pub db: Database,
162 pub db_podcast: DBPod,
163 pub cmd_tx: PlayerCmdSender,
164 pub stream_tx: StreamTX,
165
166 pub errors_since_last_progress: usize,
168}
169
170impl GeneralPlayer {
171 pub fn new_backend(
178 backend: BackendSelect,
179 config: SharedServerSettings,
180 cmd_tx: PlayerCmdSender,
181 stream_tx: StreamTX,
182 playlist: SharedPlaylist,
183 ) -> Result<Self> {
184 let backend = Backend::new_select(backend, config.clone(), cmd_tx.clone());
185
186 let db_path = get_app_config_path().with_context(|| "failed to get podcast db path.")?;
187
188 let db_podcast = DBPod::new(&db_path).with_context(|| "error connecting to podcast db.")?;
189 let config_read = config.read();
190 let db = Database::new_default_path()?;
191
192 let mpris = if config.read().settings.player.use_mediacontrols {
193 let mut mpris = mpris::Mpris::new(cmd_tx.clone());
194
195 mpris.update_volume(backend.as_player().volume());
198
199 Some(mpris)
200 } else {
201 None
202 };
203 let discord = if config.read().get_discord_status_enable() {
204 Some(discord::Rpc::default())
205 } else {
206 None
207 };
208
209 drop(config_read);
210
211 Ok(Self {
212 backend,
213 playlist,
214 config,
215 mpris,
216 discord,
217 db,
218 db_podcast,
219 cmd_tx,
220 stream_tx,
221 current_track_updated: false,
222
223 errors_since_last_progress: 0,
224 })
225 }
226
227 pub fn increment_errors(&mut self) {
229 self.errors_since_last_progress += 1;
230 }
231
232 pub fn reset_errors(&mut self) {
234 self.errors_since_last_progress = 0;
235 }
236
237 pub fn new(
244 config: SharedServerSettings,
245 cmd_tx: PlayerCmdSender,
246 stream_tx: StreamTX,
247 playlist: SharedPlaylist,
248 ) -> Result<Self> {
249 Self::new_backend(
250 BackendSelect::default(),
251 config,
252 cmd_tx,
253 stream_tx,
254 playlist,
255 )
256 }
257
258 pub fn reload_config(&mut self) -> Result<()> {
264 info!("Reloading config");
265 let mut config = self.config.write();
266 let parsed = ServerConfigVersionedDefaulted::from_config_path()?.into_settings();
267 config.settings = parsed;
268
269 if config.settings.player.use_mediacontrols && self.mpris.is_none() {
270 let mut mpris = mpris::Mpris::new(self.cmd_tx.clone());
272 if let Some(track) = self.playlist.read().current_track() {
274 mpris.add_and_play(track);
275 }
276 mpris.update_volume(self.volume());
278 self.mpris.replace(mpris);
279 } else if !config.settings.player.use_mediacontrols && self.mpris.is_some() {
280 self.mpris.take();
282 }
283
284 if config.get_discord_status_enable() && self.discord.is_none() {
285 let discord = discord::Rpc::default();
287
288 if let Some(track) = self.playlist.read().current_track() {
290 discord.update(track);
291 }
292
293 self.discord.replace(discord);
294 } else if !config.get_discord_status_enable() && self.discord.is_some() {
295 self.discord.take();
297 }
298
299 info!("Config Reloaded");
300
301 Ok(())
302 }
303
304 fn get_player(&self) -> &dyn PlayerTrait {
305 self.backend.as_player()
306 }
307
308 fn get_player_mut(&mut self) -> &mut (dyn PlayerTrait + Send) {
309 self.backend.as_player_mut()
310 }
311
312 pub fn toggle_gapless(&mut self) -> bool {
313 let new_gapless = !<Self as PlayerTrait>::gapless(self);
314 <Self as PlayerTrait>::set_gapless(self, new_gapless);
315 self.config.write().settings.player.gapless = new_gapless;
316 new_gapless
317 }
318
319 pub fn start_play(&mut self) {
325 let mut playlist = self.playlist.write();
326 if playlist.is_stopped() | playlist.is_paused() {
327 playlist.set_status(RunningStatus::Running);
328 }
329
330 playlist.proceed();
331
332 if let Some(track) = playlist.current_track().cloned() {
333 info!("Starting Track {track:#?}");
334
335 if playlist.has_next_track() {
336 playlist.set_next_track(None);
337 drop(playlist);
338 self.current_track_updated = true;
339 info!("gapless next track played");
340 self.add_and_play_mpris_discord();
341
342 self.send_track_changed();
343
344 return;
345 }
346 drop(playlist);
347
348 self.current_track_updated = true;
349 let wait = async {
350 self.add_and_play(&track).await;
351 };
352 Handle::current().block_on(wait);
353
354 self.add_and_play_mpris_discord();
355 self.player_restore_last_position();
356
357 self.send_track_changed();
358 }
359 }
360
361 pub fn metadata_changed(&mut self) {
363 self.send_track_changed();
364 }
365
366 fn send_track_changed(&mut self) {
368 self.send_stream_ev(UpdateEvents::TrackChanged(TrackChangedInfo {
369 current_track_index: u64::try_from(self.playlist.read().get_current_track_index())
370 .unwrap(),
371 current_track_updated: self.current_track_updated,
372 title: self.media_info().media_title,
373 progress: self.get_progress(),
374 }));
375 }
376
377 fn add_and_play_mpris_discord(&mut self) {
378 if let Some(track) = self.playlist.read().current_track() {
379 if let Some(ref mut mpris) = self.mpris {
380 mpris.add_and_play(track);
381 }
382
383 if let Some(ref discord) = self.discord {
384 discord.update(track);
385 }
386 }
387 }
388 pub fn enqueue_next_from_playlist(&mut self) {
389 let mut playlist = self.playlist.write();
390 if playlist.has_next_track() {
391 return;
392 }
393
394 let Some(track) = playlist.fetch_next_track().cloned() else {
395 return;
396 };
397 drop(playlist);
398
399 self.enqueue_next(&track);
400
401 info!("Next track enqueued: {track:#?}");
402 }
403
404 pub fn next(&mut self) {
406 if self.playlist.read().current_track().is_some() {
407 info!("skip route 1 which is in most cases.");
408 self.playlist.write().set_next_track(None);
409 self.skip_one();
410 } else {
411 info!("skip route 2 cause no current track.");
412 self.stop();
413 }
414 }
415
416 pub fn previous(&mut self) {
418 let mut playlist = self.playlist.write();
419 playlist.previous();
420 playlist.proceed_false();
421 drop(playlist);
422 self.next();
423 }
424
425 pub fn toggle_pause(&mut self) {
427 let status = self.playlist.read().status();
430 match status {
431 RunningStatus::Running => {
432 <Self as PlayerTrait>::pause(self);
433 }
434 RunningStatus::Stopped => {}
435 RunningStatus::Paused => {
436 <Self as PlayerTrait>::resume(self);
437 }
438 }
439 }
440
441 pub fn pause(&mut self) {
443 let status = self.playlist.read().status();
446 match status {
447 RunningStatus::Running => {
448 <Self as PlayerTrait>::pause(self);
449 }
450 RunningStatus::Stopped | RunningStatus::Paused => {}
451 }
452 }
453
454 pub fn play(&mut self) {
456 let status = self.playlist.read().status();
459 match status {
460 RunningStatus::Running | RunningStatus::Stopped => {}
461 RunningStatus::Paused => {
462 <Self as PlayerTrait>::resume(self);
463 }
464 }
465 }
466 pub fn seek_relative(&mut self, forward: bool) {
470 let track_len = self
472 .playlist
473 .read()
474 .current_track()
475 .and_then(Track::duration)
476 .unwrap_or(Duration::from_secs(5))
477 .as_secs();
478
479 let mut offset = self
480 .config
481 .read()
482 .settings
483 .player
484 .seek_step
485 .get_step(track_len);
486
487 if !forward {
488 offset = -offset;
489 }
490 self.seek(offset).expect("Error in player seek.");
491 }
492
493 fn set_last_position(&self, track: &Track, to: Option<Duration>) -> Result<()> {
495 match track.inner() {
496 MediaTypes::Track(track_data) => {
497 track_ops::set_last_position(&self.db.get_connection(), track_data.path(), to)
498 .with_context(|| track_data.path().to_string_lossy().to_string())?;
499 }
500 MediaTypes::Radio(_) => (),
501 MediaTypes::Podcast(_podcast_track_data) => {
502 let to = to.unwrap_or_default();
503 self.db_podcast
504 .set_last_position(track, to)
505 .context("Podcast Episode")?;
506 }
507 }
508
509 Ok(())
510 }
511
512 #[allow(clippy::cast_sign_loss)]
513 pub fn player_save_last_position(&mut self) {
514 let playlist = self.playlist.read();
515 let Some(track) = playlist.current_track() else {
516 info!("Not saving Last position as there is no current track");
517 return;
518 };
519 let Some(position) = self.position() else {
520 info!("Not saving Last position as there is no position");
521 return;
522 };
523
524 let Some(time_before_save) = self
525 .config
526 .read()
527 .settings
528 .player
529 .remember_position
530 .get_time(track.media_type())
531 else {
532 info!(
533 "Not saving Last position as \"Remember last position\" is not enabled for {:#?}",
534 track.media_type()
535 );
536 return;
537 };
538
539 if time_before_save < position.as_secs() {
540 if let Err(err) = self.set_last_position(track, Some(position)) {
541 error!("Saving last_position failed. Error: {err:#?}");
542 }
543 } else {
544 info!("Not saving Last position as the position is lower than time_before_save");
545 }
546 }
547
548 pub fn player_restore_last_position(&mut self) {
549 let playlist = self.playlist.read();
550 let Some(track) = playlist.current_track().cloned() else {
551 info!("Not restoring Last position as there is no current track");
552 return;
553 };
554 drop(playlist);
555
556 let mut restored = false;
557
558 if self
559 .config
560 .read()
561 .settings
562 .player
563 .remember_position
564 .is_enabled_for(track.media_type())
565 {
566 match track.inner() {
567 MediaTypes::Track(track_data) => {
568 let res =
569 track_ops::get_last_position(&self.db.get_connection(), track_data.path());
570 if let Ok(Some(last_pos)) = res {
571 self.seek_to(last_pos);
572 restored = true;
573 }
574 }
575 MediaTypes::Radio(_) => (),
576 MediaTypes::Podcast(_podcast_track_data) => {
577 if let Ok(last_pos) = self.db_podcast.get_last_position(&track) {
578 self.seek_to(last_pos);
579 restored = true;
580 }
581 }
582 }
583 } else {
584 info!(
585 "Not restoring Last position as it is not enabled for {:#?}",
586 track.media_type()
587 );
588 }
589
590 if restored {
592 if let Err(err) = self.set_last_position(&track, None) {
593 error!("Resetting last_position failed. Error: {err:#?}");
594 }
595 }
596 }
597
598 pub fn update_progress(&mut self, progress: &PlayerProgress) {
600 self.mpris_update_progress(progress);
601
602 self.send_stream_ev_no_err(UpdateEvents::Progress(*progress));
603 }
604
605 fn send_stream_ev(&self, ev: UpdateEvents) {
607 if self.stream_tx.send(ev).is_err() {
609 debug!("Stream Event not send: No Receivers");
610 }
611 }
612
613 fn send_stream_ev_no_err(&self, ev: UpdateEvents) {
617 let _ = self.stream_tx.send(ev);
618 }
619}
620
621#[async_trait]
622impl PlayerTrait for GeneralPlayer {
623 async fn add_and_play(&mut self, track: &Track) {
624 self.get_player_mut().add_and_play(track).await;
625 }
626 fn volume(&self) -> Volume {
627 self.get_player().volume()
628 }
629 fn add_volume(&mut self, volume: VolumeSigned) -> Volume {
630 let vol = self.get_player_mut().add_volume(volume);
631 self.mpris_volume_update();
632 self.send_stream_ev(UpdateEvents::VolumeChanged { volume: vol });
633
634 vol
635 }
636 fn set_volume(&mut self, volume: Volume) -> Volume {
637 let vol = self.get_player_mut().set_volume(volume);
638 self.mpris_volume_update();
639 self.send_stream_ev(UpdateEvents::VolumeChanged { volume: vol });
640
641 vol
642 }
643 fn pause(&mut self) {
645 self.playlist.write().set_status(RunningStatus::Paused);
646 self.get_player_mut().pause();
647 if let Some(ref mut mpris) = self.mpris {
648 mpris.pause();
649 }
650 if let Some(ref discord) = self.discord {
651 discord.pause();
652 }
653 }
654 fn resume(&mut self) {
656 self.playlist.write().set_status(RunningStatus::Running);
657 self.get_player_mut().resume();
658 if let Some(ref mut mpris) = self.mpris {
659 mpris.resume();
660 }
661 let time_pos = self.get_player().position();
662 if let Some(ref discord) = self.discord {
663 discord.resume(time_pos);
664 }
665 }
666 fn is_paused(&self) -> bool {
667 self.get_player().is_paused()
668 }
669 fn seek(&mut self, secs: i64) -> Result<()> {
670 self.get_player_mut().seek(secs)
671 }
672 fn seek_to(&mut self, position: Duration) {
673 self.get_player_mut().seek_to(position);
674 }
675
676 fn set_speed(&mut self, speed: Speed) -> Speed {
677 let speed = self.get_player_mut().set_speed(speed);
678 self.send_stream_ev(UpdateEvents::SpeedChanged { speed });
679
680 speed
681 }
682
683 fn add_speed(&mut self, speed: SpeedSigned) -> Speed {
684 let speed = self.get_player_mut().add_speed(speed);
685 self.send_stream_ev(UpdateEvents::SpeedChanged { speed });
686
687 speed
688 }
689
690 fn speed(&self) -> Speed {
691 self.get_player().speed()
692 }
693
694 fn stop(&mut self) {
695 self.playlist.write().stop();
696 self.get_player_mut().stop();
697 }
698
699 fn get_progress(&self) -> Option<PlayerProgress> {
700 self.get_player().get_progress()
701 }
702
703 fn gapless(&self) -> bool {
704 self.get_player().gapless()
705 }
706
707 fn set_gapless(&mut self, to: bool) {
708 self.get_player_mut().set_gapless(to);
709 self.send_stream_ev(UpdateEvents::GaplessChanged { gapless: to });
710 }
711
712 fn skip_one(&mut self) {
713 self.get_player_mut().skip_one();
714 }
715
716 fn position(&self) -> Option<PlayerTimeUnit> {
717 self.get_player().position()
718 }
719
720 fn enqueue_next(&mut self, track: &Track) {
721 self.get_player_mut().enqueue_next(track);
722 }
723
724 fn media_info(&self) -> MediaInfo {
725 self.get_player().media_info()
726 }
727}
728
729#[derive(Debug, Clone, PartialEq, Default)]
733pub struct MediaInfo {
734 pub media_title: Option<String>,
736}
737
738pub type Volume = u16;
739pub type VolumeSigned = i16;
741pub type Speed = i32;
742pub type SpeedSigned = Speed;
744
745pub const MIN_SPEED: Speed = 1;
746pub const MAX_SPEED: Speed = 30;
747
748#[allow(clippy::module_name_repetitions)]
749#[async_trait]
750pub trait PlayerTrait {
751 async fn add_and_play(&mut self, track: &Track);
753 fn volume(&self) -> Volume;
755 fn add_volume(&mut self, volume: VolumeSigned) -> Volume {
759 let volume = self.volume().saturating_add_signed(volume);
760 self.set_volume(volume)
761 }
762 fn set_volume(&mut self, volume: Volume) -> Volume;
766 fn pause(&mut self);
767 fn resume(&mut self);
768 fn is_paused(&self) -> bool;
769 fn seek(&mut self, secs: i64) -> Result<()>;
775 fn seek_to(&mut self, position: Duration);
778 fn get_progress(&self) -> Option<PlayerProgress>;
780 fn set_speed(&mut self, speed: Speed) -> Speed;
784 fn add_speed(&mut self, speed: SpeedSigned) -> Speed {
788 let speed = (self.speed() + speed).clamp(MIN_SPEED, MAX_SPEED);
790 self.set_speed(speed)
791 }
792 fn speed(&self) -> Speed;
794 fn stop(&mut self);
795 fn gapless(&self) -> bool;
796 fn set_gapless(&mut self, to: bool);
797 fn skip_one(&mut self);
798 fn position(&self) -> Option<PlayerTimeUnit> {
802 self.get_progress()?.position
803 }
804 fn enqueue_next(&mut self, track: &Track);
806 fn media_info(&self) -> MediaInfo;
808}