mpris_async/
events.rs

1//! [`PlayerEventsStream`] handles when new player events are emitted by a given player.
2//! Alternatively, the class gives a reciever which can be used to track events.
3
4use std::{task::{Waker, Poll}, thread};
5
6use async_std::{channel::{Sender, Receiver, unbounded, TryRecvError}, task, stream::Stream};
7use mpris::{Player, Event, PlayerFinder};
8
9/// Infinite Stream which tracks the Events emitted by a player. Streams created with new create a
10/// new thread to track events. 
11#[derive(Debug, Clone)]
12pub struct PlayerEventsStream {
13    // Identity is used because we cannot send player across threads or tasks
14    identity: String,
15    sender: Sender<Event>,
16    reciever: Receiver<Event>,
17    wakers: (Sender<Waker>, Receiver<Waker>),
18}
19
20impl PlayerEventsStream {
21    /// Creates a new [`PlayerEventsStream`] to track the changes of a player. This function will
22    /// spawn a new thread that listens for changes and sends them to the stream and any stream
23    /// cloned from it. The thread only closes once the player has quit.
24    pub fn new(player: &Player) -> PlayerEventsStream {
25        let (s, r) = unbounded();
26        let (wake_send, wake_reciev) = unbounded();
27        let streamer = PlayerEventsStream {identity: player.identity().to_string(), sender: s, reciever: r, wakers: (wake_send, wake_reciev)};
28        let stream_clone = streamer.clone();
29        thread::spawn(move || stream_clone.events_listener());
30        return streamer;
31    }
32
33    fn events_listener(self) {
34        let finder = match PlayerFinder::new() {
35            Ok(x) => x,
36            Err(_) => {
37                self.sender.try_send(Event::PlayerShutDown).unwrap();
38                self.sender.close();
39                return;
40            },
41        };
42        let player = finder.find_by_name(&self.identity).unwrap();
43        let events = player.events().unwrap();
44        
45        for event in events {
46            self.sender.try_send(match event.as_ref().unwrap() {
47                Event::PlayerShutDown => Event::PlayerShutDown,
48                Event::Paused => Event::Paused,
49                Event::Playing => Event::Playing,
50                Event::Stopped => Event::Stopped,
51                Event::LoopingChanged(status) => Event::LoopingChanged(*status),
52                Event::ShuffleToggled(x) => Event::ShuffleToggled(*x),
53                Event::VolumeChanged(x) => Event::VolumeChanged(*x),
54                Event::PlaybackRateChanged(x) => Event::PlaybackRateChanged(*x),
55                Event::TrackChanged(x) => Event::TrackChanged(x.clone()),
56                Event::Seeked { position_in_us } => Event::Seeked { position_in_us: *position_in_us },
57                Event::TrackAdded(x) => Event::TrackAdded(x.clone()),
58                Event::TrackRemoved(x) => Event::TrackRemoved(x.clone()),
59                Event::TrackMetadataChanged { old_id, new_id } => Event::TrackMetadataChanged { old_id: old_id.clone(), new_id: new_id.clone() },
60                Event::TrackListReplaced => Event::TrackListReplaced,
61            }).unwrap();
62            if matches!(event.unwrap(), Event::PlayerShutDown) {
63                self.sender.close();
64            }
65
66            loop {
67                match self.wakers.1.try_recv() {
68                    Ok(waker) => {
69                        waker.wake_by_ref();
70                    }
71                    _ => break,
72                }
73            }
74        }
75    }
76
77    /// Access to the reciever used to send Events around.
78    pub fn get_reciever(&self) -> Receiver<Event> {
79        self.reciever.clone()
80    }
81}
82
83impl Stream for PlayerEventsStream {
84    type Item = Event;
85
86    fn poll_next(self: std::pin::Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Option<Self::Item>> {
87        let reciever = self.reciever.clone();
88        self.wakers.0.try_send(cx.waker().clone()).unwrap();
89        let event = reciever.try_recv();
90        match event {
91            Ok(event) => {
92                cx.waker().wake_by_ref();
93                Poll::Ready(Some(event))
94            },
95            Err(TryRecvError::Empty) => Poll::Pending,
96            Err(TryRecvError::Closed) => Poll::Ready(None),
97        }
98    }
99}