1use async_stream::stream;
7use async_trait::async_trait;
8use futures::Stream;
9use std::sync::Arc;
10
11use plexus_core::plexus::schema::ChildSummary;
12use plexus_core::plexus::{ChildRouter, PlexusError, PlexusStream};
13use plexus_core::Activation;
14
15use crate::client::MonoClient;
16use crate::player::Player;
17use crate::playlist::PlaylistHub;
18use crate::types::MonoEvent;
19
20#[derive(Clone)]
22pub struct PlayerHub {
23 player: Arc<Player>,
24 playlist: PlaylistHub,
25}
26
27impl PlayerHub {
28 pub async fn new(client: Arc<MonoClient>) -> Self {
30 let player = Player::new(client.clone()).await;
31 let playlist = PlaylistHub::new(player.clone(), client);
32 Self { player, playlist }
33 }
34
35 pub fn plugin_children(&self) -> Vec<ChildSummary> {
37 vec![ChildSummary {
38 namespace: "playlist".into(),
39 description: "Persistent playlist management".into(),
40 hash: String::new(),
41 }]
42 }
43}
44
45#[async_trait]
46impl ChildRouter for PlayerHub {
47 fn router_namespace(&self) -> &str {
48 "player"
49 }
50
51 async fn router_call(
52 &self,
53 method: &str,
54 params: serde_json::Value,
55 ) -> Result<PlexusStream, PlexusError> {
56 self.call(method, params).await
57 }
58
59 async fn get_child(&self, name: &str) -> Option<Box<dyn ChildRouter>> {
60 match name {
61 "playlist" => Some(Box::new(self.playlist.clone())),
62 _ => None,
63 }
64 }
65}
66
67#[plexus_macros::hub_methods(
68 namespace = "player",
69 version = "0.2.0",
70 hub,
71 description = "Playback engine — play, queue, and control audio with playlist management",
72 crate_path = "plexus_core"
73)]
74impl PlayerHub {
75 #[plexus_macros::hub_method(
77 description = "Play a track through speakers. Stops any current playback.",
78 params(
79 id = "Tidal track ID",
80 quality = "Quality: LOSSLESS (default), HI_RES_LOSSLESS, HIGH, LOW"
81 )
82 )]
83 pub async fn play(
84 &self,
85 id: u64,
86 quality: Option<String>,
87 ) -> impl Stream<Item = MonoEvent> + Send + 'static {
88 let player = self.player.clone();
89 let quality = quality.unwrap_or_else(|| "LOSSLESS".to_string());
90 stream! {
91 match player.play_track(id, &quality).await {
92 Ok(()) => yield MonoEvent::PlayerAck {
93 action: "play".to_string(),
94 message: format!("playing track {id}"),
95 },
96 Err(e) => yield MonoEvent::Error { message: e },
97 }
98 }
99 }
100
101 #[plexus_macros::hub_method(
103 description = "Pause the current playback"
104 )]
105 pub async fn pause(&self) -> impl Stream<Item = MonoEvent> + Send + 'static {
106 let player = self.player.clone();
107 stream! {
108 player.pause().await;
109 yield MonoEvent::PlayerAck {
110 action: "pause".to_string(),
111 message: "playback paused".to_string(),
112 };
113 }
114 }
115
116 #[plexus_macros::hub_method(
118 description = "Resume paused playback"
119 )]
120 pub async fn resume(&self) -> impl Stream<Item = MonoEvent> + Send + 'static {
121 let player = self.player.clone();
122 stream! {
123 player.resume().await;
124 yield MonoEvent::PlayerAck {
125 action: "resume".to_string(),
126 message: "playback resumed".to_string(),
127 };
128 }
129 }
130
131 #[plexus_macros::hub_method(
133 description = "Stop playback and clear current track"
134 )]
135 pub async fn stop(&self) -> impl Stream<Item = MonoEvent> + Send + 'static {
136 let player = self.player.clone();
137 stream! {
138 player.stop().await;
139 yield MonoEvent::PlayerAck {
140 action: "stop".to_string(),
141 message: "playback stopped".to_string(),
142 };
143 }
144 }
145
146 #[plexus_macros::hub_method(
148 description = "Skip to the next track in the queue"
149 )]
150 pub async fn next(&self) -> impl Stream<Item = MonoEvent> + Send + 'static {
151 let player = self.player.clone();
152 stream! {
153 match player.next().await {
154 Ok(()) => yield MonoEvent::PlayerAck {
155 action: "next".to_string(),
156 message: "skipped to next track".to_string(),
157 },
158 Err(e) => yield MonoEvent::Error { message: e },
159 }
160 }
161 }
162
163 #[plexus_macros::hub_method(
165 description = "Go back to the previous track"
166 )]
167 pub async fn previous(&self) -> impl Stream<Item = MonoEvent> + Send + 'static {
168 let player = self.player.clone();
169 stream! {
170 match player.previous().await {
171 Ok(()) => yield MonoEvent::PlayerAck {
172 action: "previous".to_string(),
173 message: "went to previous track".to_string(),
174 },
175 Err(e) => yield MonoEvent::Error { message: e },
176 }
177 }
178 }
179
180 #[plexus_macros::hub_method(
182 description = "Set playback volume",
183 params(level = "Volume level from 0.0 (mute) to 1.0 (full)")
184 )]
185 pub async fn volume(
186 &self,
187 level: f32,
188 ) -> impl Stream<Item = MonoEvent> + Send + 'static {
189 let player = self.player.clone();
190 stream! {
191 player.set_volume(level).await;
192 yield MonoEvent::PlayerAck {
193 action: "volume".to_string(),
194 message: format!("volume set to {:.0}%", level * 100.0),
195 };
196 }
197 }
198
199 #[plexus_macros::hub_method(
201 description = "Set pre-amp gain. Values above 1.0 boost the signal (max 4.0). Effective volume = preamp × volume.",
202 params(level = "Gain level from 0.0 (silent) to 4.0 (4× boost)")
203 )]
204 pub async fn preamp(
205 &self,
206 level: f32,
207 ) -> impl Stream<Item = MonoEvent> + Send + 'static {
208 let player = self.player.clone();
209 stream! {
210 player.set_preamp(level).await;
211 yield MonoEvent::PlayerAck {
212 action: "preamp".to_string(),
213 message: format!("preamp set to {:.1}×", level.clamp(0.0, 4.0)),
214 };
215 }
216 }
217
218 #[plexus_macros::hub_method(
220 streaming,
221 description = "Add all tracks from an album to the queue. Auto-starts if nothing is playing.",
222 params(
223 id = "Tidal album ID",
224 quality = "Quality: LOSSLESS (default), HI_RES_LOSSLESS, HIGH, LOW"
225 )
226 )]
227 pub async fn queue_album(
228 &self,
229 id: u64,
230 quality: Option<String>,
231 ) -> impl Stream<Item = MonoEvent> + Send + 'static {
232 let player = self.player.clone();
233 let quality = quality.unwrap_or_else(|| "LOSSLESS".to_string());
234 stream! {
235 match player.queue_album(id, &quality).await {
236 Ok(tracks) => {
237 let count = tracks.len();
238 yield MonoEvent::PlayerAck {
239 action: "queue_album".to_string(),
240 message: format!("{count} tracks queued"),
241 };
242 let current_index = Some(0usize);
243 yield MonoEvent::Queue {
244 tracks,
245 current_index,
246 };
247 }
248 Err(e) => yield MonoEvent::Error { message: e },
249 }
250 }
251 }
252
253 #[plexus_macros::hub_method(
255 description = "Add a track to the end of the playback queue. Auto-starts if nothing is playing.",
256 params(
257 id = "Tidal track ID",
258 quality = "Quality: LOSSLESS (default), HI_RES_LOSSLESS, HIGH, LOW"
259 )
260 )]
261 pub async fn queue_add(
262 &self,
263 id: u64,
264 quality: Option<String>,
265 ) -> impl Stream<Item = MonoEvent> + Send + 'static {
266 let player = self.player.clone();
267 let quality = quality.unwrap_or_else(|| "LOSSLESS".to_string());
268 stream! {
269 match player.queue_add(id, &quality).await {
270 Ok(()) => yield MonoEvent::PlayerAck {
271 action: "queue_add".to_string(),
272 message: format!("track {id} added to queue"),
273 },
274 Err(e) => yield MonoEvent::Error { message: e },
275 }
276 }
277 }
278
279 #[plexus_macros::hub_method(
281 streaming,
282 description = "Add multiple tracks by ID in one call. Resolves metadata in parallel. Auto-starts if nothing is playing.",
283 params(
284 ids = "List of Tidal track IDs",
285 quality = "Quality: LOSSLESS (default), HI_RES_LOSSLESS, HIGH, LOW"
286 )
287 )]
288 pub async fn queue_batch(
289 &self,
290 ids: Vec<u64>,
291 quality: Option<String>,
292 ) -> impl Stream<Item = MonoEvent> + Send + 'static {
293 let player = self.player.clone();
294 let quality = quality.unwrap_or_else(|| "LOSSLESS".to_string());
295 stream! {
296 match player.queue_batch(&ids, &quality).await {
297 Ok(tracks) => {
298 let count = tracks.len();
299 yield MonoEvent::PlayerAck {
300 action: "queue_batch".to_string(),
301 message: format!("{count} tracks queued"),
302 };
303 yield MonoEvent::Queue {
304 tracks,
305 current_index: Some(0),
306 };
307 }
308 Err(e) => yield MonoEvent::Error { message: e },
309 }
310 }
311 }
312
313 #[plexus_macros::hub_method(
315 description = "Clear all tracks from the queue (does not stop current track)"
316 )]
317 pub async fn queue_clear(&self) -> impl Stream<Item = MonoEvent> + Send + 'static {
318 let player = self.player.clone();
319 stream! {
320 player.queue_clear().await;
321 yield MonoEvent::PlayerAck {
322 action: "queue_clear".to_string(),
323 message: "queue cleared".to_string(),
324 };
325 }
326 }
327
328 #[plexus_macros::hub_method(
330 description = "Get the current queue contents including the now-playing track"
331 )]
332 pub async fn queue_get(&self) -> impl Stream<Item = MonoEvent> + Send + 'static {
333 let player = self.player.clone();
334 stream! {
335 let (current, upcoming) = player.queue_get().await;
336 let current_index = if current.is_some() { Some(0usize) } else { None };
337 let mut tracks = Vec::new();
338 if let Some(c) = current {
339 tracks.push(c);
340 }
341 tracks.extend(upcoming);
342 yield MonoEvent::Queue {
343 tracks,
344 current_index,
345 };
346 }
347 }
348
349 #[plexus_macros::hub_method(
351 description = "Move a track within the queue",
352 params(
353 from = "Source index in the queue (0-based)",
354 to = "Destination index in the queue (0-based)"
355 )
356 )]
357 pub async fn queue_reorder(
358 &self,
359 from: u32,
360 to: u32,
361 ) -> impl Stream<Item = MonoEvent> + Send + 'static {
362 let player = self.player.clone();
363 stream! {
364 match player.queue_reorder(from as usize, to as usize).await {
365 Ok(()) => yield MonoEvent::PlayerAck {
366 action: "queue_reorder".to_string(),
367 message: format!("moved track from position {from} to {to}"),
368 },
369 Err(e) => yield MonoEvent::Error { message: e },
370 }
371 }
372 }
373
374 #[plexus_macros::hub_method(
376 description = "Get current playback status — track, position, volume, queue length"
377 )]
378 pub async fn status(&self) -> impl Stream<Item = MonoEvent> + Send + 'static {
379 let rx = self.player.subscribe_now_playing();
380 let np = rx.borrow().clone();
381 stream! {
382 yield MonoEvent::NowPlaying {
383 track_id: np.track_id,
384 title: np.title,
385 artist: np.artist,
386 album: np.album,
387 status: np.status,
388 position_secs: np.position_secs,
389 duration_secs: np.duration_secs,
390 volume: np.volume,
391 preamp: np.preamp,
392 queue_length: np.queue_length,
393 url: np.url,
394 };
395 }
396 }
397
398 #[plexus_macros::hub_method(
400 streaming,
401 description = "Stream real-time playback position and status updates (~1s interval while playing)"
402 )]
403 pub async fn now_playing(&self) -> impl Stream<Item = MonoEvent> + Send + 'static {
404 let mut rx = self.player.subscribe_now_playing();
405 stream! {
406 {
408 let np = rx.borrow().clone();
409 yield MonoEvent::NowPlaying {
410 track_id: np.track_id,
411 title: np.title,
412 artist: np.artist,
413 album: np.album,
414 status: np.status,
415 position_secs: np.position_secs,
416 duration_secs: np.duration_secs,
417 volume: np.volume,
418 preamp: np.preamp,
419 queue_length: np.queue_length,
420 url: np.url,
421 };
422 }
423 while rx.changed().await.is_ok() {
425 let np = rx.borrow().clone();
426 yield MonoEvent::NowPlaying {
427 track_id: np.track_id,
428 title: np.title,
429 artist: np.artist,
430 album: np.album,
431 status: np.status,
432 position_secs: np.position_secs,
433 duration_secs: np.duration_secs,
434 volume: np.volume,
435 preamp: np.preamp,
436 queue_length: np.queue_length,
437 url: np.url,
438 };
439 }
440 }
441 }
442}