1use std::{task::{Waker, Poll}, thread};
5
6use async_std::{channel::{Sender, Receiver, unbounded, TryRecvError}, task, stream::Stream};
7use mpris::{Player, Event, PlayerFinder};
8
9#[derive(Debug, Clone)]
12pub struct PlayerEventsStream {
13 identity: String,
15 sender: Sender<Event>,
16 reciever: Receiver<Event>,
17 wakers: (Sender<Waker>, Receiver<Waker>),
18}
19
20impl PlayerEventsStream {
21 pub fn new(player: &Player) -> PlayerEventsStream {
25 let (s, r) = unbounded();
26 let (wake_send, wake_reciev) = unbounded();
27 let streamer = PlayerEventsStream {identity: player.identity().to_string(), sender: s, reciever: r, wakers: (wake_send, wake_reciev)};
28 let stream_clone = streamer.clone();
29 thread::spawn(move || stream_clone.events_listener());
30 return streamer;
31 }
32
33 fn events_listener(self) {
34 let finder = match PlayerFinder::new() {
35 Ok(x) => x,
36 Err(_) => {
37 self.sender.try_send(Event::PlayerShutDown).unwrap();
38 self.sender.close();
39 return;
40 },
41 };
42 let player = finder.find_by_name(&self.identity).unwrap();
43 let events = player.events().unwrap();
44
45 for event in events {
46 self.sender.try_send(match event.as_ref().unwrap() {
47 Event::PlayerShutDown => Event::PlayerShutDown,
48 Event::Paused => Event::Paused,
49 Event::Playing => Event::Playing,
50 Event::Stopped => Event::Stopped,
51 Event::LoopingChanged(status) => Event::LoopingChanged(*status),
52 Event::ShuffleToggled(x) => Event::ShuffleToggled(*x),
53 Event::VolumeChanged(x) => Event::VolumeChanged(*x),
54 Event::PlaybackRateChanged(x) => Event::PlaybackRateChanged(*x),
55 Event::TrackChanged(x) => Event::TrackChanged(x.clone()),
56 Event::Seeked { position_in_us } => Event::Seeked { position_in_us: *position_in_us },
57 Event::TrackAdded(x) => Event::TrackAdded(x.clone()),
58 Event::TrackRemoved(x) => Event::TrackRemoved(x.clone()),
59 Event::TrackMetadataChanged { old_id, new_id } => Event::TrackMetadataChanged { old_id: old_id.clone(), new_id: new_id.clone() },
60 Event::TrackListReplaced => Event::TrackListReplaced,
61 }).unwrap();
62 if matches!(event.unwrap(), Event::PlayerShutDown) {
63 self.sender.close();
64 }
65
66 loop {
67 match self.wakers.1.try_recv() {
68 Ok(waker) => {
69 waker.wake_by_ref();
70 }
71 _ => break,
72 }
73 }
74 }
75 }
76
77 pub fn get_reciever(&self) -> Receiver<Event> {
79 self.reciever.clone()
80 }
81}
82
83impl Stream for PlayerEventsStream {
84 type Item = Event;
85
86 fn poll_next(self: std::pin::Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Option<Self::Item>> {
87 let reciever = self.reciever.clone();
88 self.wakers.0.try_send(cx.waker().clone()).unwrap();
89 let event = reciever.try_recv();
90 match event {
91 Ok(event) => {
92 cx.waker().wake_by_ref();
93 Poll::Ready(Some(event))
94 },
95 Err(TryRecvError::Empty) => Poll::Pending,
96 Err(TryRecvError::Closed) => Poll::Ready(None),
97 }
98 }
99}