Skip to main content

plexus_mono/
player_hub.rs

1//! PlayerHub — Plexus RPC activation for stateful playback
2//!
3//! Owns the audio playback engine, queue, and playlist child router.
4//! Requires speakers. Registered as a hub activation under `monochrome`.
5
6use async_stream::stream;
7use async_trait::async_trait;
8use futures::Stream;
9use std::sync::Arc;
10
11use plexus_core::plexus::schema::ChildSummary;
12use plexus_core::plexus::{ChildRouter, PlexusError, PlexusStream};
13use plexus_core::Activation;
14
15use crate::client::MonoClient;
16use crate::player::Player;
17use crate::playlist::PlaylistHub;
18use crate::types::MonoEvent;
19
20/// Stateful playback engine activation with queue and playlist management.
21#[derive(Clone)]
22pub struct PlayerHub {
23    player: Arc<Player>,
24    playlist: PlaylistHub,
25}
26
27impl PlayerHub {
28    /// Create a new PlayerHub from a shared MonoClient.
29    pub async fn new(client: Arc<MonoClient>) -> Self {
30        let player = Player::new(client.clone()).await;
31        let playlist = PlaylistHub::new(player.clone(), client);
32        Self { player, playlist }
33    }
34
35    /// Return child activation summaries for schema discovery
36    pub fn plugin_children(&self) -> Vec<ChildSummary> {
37        vec![ChildSummary {
38            namespace: "playlist".into(),
39            description: "Persistent playlist management".into(),
40            hash: String::new(),
41        }]
42    }
43}
44
45#[async_trait]
46impl ChildRouter for PlayerHub {
47    fn router_namespace(&self) -> &str {
48        "player"
49    }
50
51    async fn router_call(
52        &self,
53        method: &str,
54        params: serde_json::Value,
55    ) -> Result<PlexusStream, PlexusError> {
56        self.call(method, params).await
57    }
58
59    async fn get_child(&self, name: &str) -> Option<Box<dyn ChildRouter>> {
60        match name {
61            "playlist" => Some(Box::new(self.playlist.clone())),
62            _ => None,
63        }
64    }
65}
66
67#[plexus_macros::hub_methods(
68    namespace = "player",
69    version = "0.2.0",
70    hub,
71    description = "Playback engine — play, queue, and control audio with playlist management",
72    crate_path = "plexus_core"
73)]
74impl PlayerHub {
75    /// Play a track immediately (stops current playback)
76    #[plexus_macros::hub_method(
77        description = "Play a track through speakers. Stops any current playback.",
78        params(
79            id = "Tidal track ID",
80            quality = "Quality: LOSSLESS (default), HI_RES_LOSSLESS, HIGH, LOW"
81        )
82    )]
83    pub async fn play(
84        &self,
85        id: u64,
86        quality: Option<String>,
87    ) -> impl Stream<Item = MonoEvent> + Send + 'static {
88        let player = self.player.clone();
89        let quality = quality.unwrap_or_else(|| "LOSSLESS".to_string());
90        stream! {
91            match player.play_track(id, &quality).await {
92                Ok(()) => yield MonoEvent::PlayerAck {
93                    action: "play".to_string(),
94                    message: format!("playing track {id}"),
95                },
96                Err(e) => yield MonoEvent::Error { message: e },
97            }
98        }
99    }
100
101    /// Pause playback
102    #[plexus_macros::hub_method(
103        description = "Pause the current playback"
104    )]
105    pub async fn pause(&self) -> impl Stream<Item = MonoEvent> + Send + 'static {
106        let player = self.player.clone();
107        stream! {
108            player.pause().await;
109            yield MonoEvent::PlayerAck {
110                action: "pause".to_string(),
111                message: "playback paused".to_string(),
112            };
113        }
114    }
115
116    /// Resume playback
117    #[plexus_macros::hub_method(
118        description = "Resume paused playback"
119    )]
120    pub async fn resume(&self) -> impl Stream<Item = MonoEvent> + Send + 'static {
121        let player = self.player.clone();
122        stream! {
123            player.resume().await;
124            yield MonoEvent::PlayerAck {
125                action: "resume".to_string(),
126                message: "playback resumed".to_string(),
127            };
128        }
129    }
130
131    /// Stop playback
132    #[plexus_macros::hub_method(
133        description = "Stop playback and clear current track"
134    )]
135    pub async fn stop(&self) -> impl Stream<Item = MonoEvent> + Send + 'static {
136        let player = self.player.clone();
137        stream! {
138            player.stop().await;
139            yield MonoEvent::PlayerAck {
140                action: "stop".to_string(),
141                message: "playback stopped".to_string(),
142            };
143        }
144    }
145
146    /// Skip to next track in queue
147    #[plexus_macros::hub_method(
148        description = "Skip to the next track in the queue"
149    )]
150    pub async fn next(&self) -> impl Stream<Item = MonoEvent> + Send + 'static {
151        let player = self.player.clone();
152        stream! {
153            match player.next().await {
154                Ok(()) => yield MonoEvent::PlayerAck {
155                    action: "next".to_string(),
156                    message: "skipped to next track".to_string(),
157                },
158                Err(e) => yield MonoEvent::Error { message: e },
159            }
160        }
161    }
162
163    /// Go to previous track
164    #[plexus_macros::hub_method(
165        description = "Go back to the previous track"
166    )]
167    pub async fn previous(&self) -> impl Stream<Item = MonoEvent> + Send + 'static {
168        let player = self.player.clone();
169        stream! {
170            match player.previous().await {
171                Ok(()) => yield MonoEvent::PlayerAck {
172                    action: "previous".to_string(),
173                    message: "went to previous track".to_string(),
174                },
175                Err(e) => yield MonoEvent::Error { message: e },
176            }
177        }
178    }
179
180    /// Set volume level
181    #[plexus_macros::hub_method(
182        description = "Set playback volume",
183        params(level = "Volume level from 0.0 (mute) to 1.0 (full)")
184    )]
185    pub async fn volume(
186        &self,
187        level: f32,
188    ) -> impl Stream<Item = MonoEvent> + Send + 'static {
189        let player = self.player.clone();
190        stream! {
191            player.set_volume(level).await;
192            yield MonoEvent::PlayerAck {
193                action: "volume".to_string(),
194                message: format!("volume set to {:.0}%", level * 100.0),
195            };
196        }
197    }
198
199    /// Set pre-amp gain level
200    #[plexus_macros::hub_method(
201        description = "Set pre-amp gain. Values above 1.0 boost the signal (max 4.0). Effective volume = preamp × volume.",
202        params(level = "Gain level from 0.0 (silent) to 4.0 (4× boost)")
203    )]
204    pub async fn preamp(
205        &self,
206        level: f32,
207    ) -> impl Stream<Item = MonoEvent> + Send + 'static {
208        let player = self.player.clone();
209        stream! {
210            player.set_preamp(level).await;
211            yield MonoEvent::PlayerAck {
212                action: "preamp".to_string(),
213                message: format!("preamp set to {:.1}×", level.clamp(0.0, 4.0)),
214            };
215        }
216    }
217
218    /// Add an entire album to the playback queue
219    #[plexus_macros::hub_method(
220        streaming,
221        description = "Add all tracks from an album to the queue. Auto-starts if nothing is playing.",
222        params(
223            id = "Tidal album ID",
224            quality = "Quality: LOSSLESS (default), HI_RES_LOSSLESS, HIGH, LOW"
225        )
226    )]
227    pub async fn queue_album(
228        &self,
229        id: u64,
230        quality: Option<String>,
231    ) -> impl Stream<Item = MonoEvent> + Send + 'static {
232        let player = self.player.clone();
233        let quality = quality.unwrap_or_else(|| "LOSSLESS".to_string());
234        stream! {
235            match player.queue_album(id, &quality).await {
236                Ok(tracks) => {
237                    let count = tracks.len();
238                    yield MonoEvent::PlayerAck {
239                        action: "queue_album".to_string(),
240                        message: format!("{count} tracks queued"),
241                    };
242                    let current_index = Some(0usize);
243                    yield MonoEvent::Queue {
244                        tracks,
245                        current_index,
246                    };
247                }
248                Err(e) => yield MonoEvent::Error { message: e },
249            }
250        }
251    }
252
253    /// Add a track to the playback queue
254    #[plexus_macros::hub_method(
255        description = "Add a track to the end of the playback queue. Auto-starts if nothing is playing.",
256        params(
257            id = "Tidal track ID",
258            quality = "Quality: LOSSLESS (default), HI_RES_LOSSLESS, HIGH, LOW"
259        )
260    )]
261    pub async fn queue_add(
262        &self,
263        id: u64,
264        quality: Option<String>,
265    ) -> impl Stream<Item = MonoEvent> + Send + 'static {
266        let player = self.player.clone();
267        let quality = quality.unwrap_or_else(|| "LOSSLESS".to_string());
268        stream! {
269            match player.queue_add(id, &quality).await {
270                Ok(()) => yield MonoEvent::PlayerAck {
271                    action: "queue_add".to_string(),
272                    message: format!("track {id} added to queue"),
273                },
274                Err(e) => yield MonoEvent::Error { message: e },
275            }
276        }
277    }
278
279    /// Add multiple tracks to the queue at once
280    #[plexus_macros::hub_method(
281        streaming,
282        description = "Add multiple tracks by ID in one call. Resolves metadata in parallel. Auto-starts if nothing is playing.",
283        params(
284            ids = "List of Tidal track IDs",
285            quality = "Quality: LOSSLESS (default), HI_RES_LOSSLESS, HIGH, LOW"
286        )
287    )]
288    pub async fn queue_batch(
289        &self,
290        ids: Vec<u64>,
291        quality: Option<String>,
292    ) -> impl Stream<Item = MonoEvent> + Send + 'static {
293        let player = self.player.clone();
294        let quality = quality.unwrap_or_else(|| "LOSSLESS".to_string());
295        stream! {
296            match player.queue_batch(&ids, &quality).await {
297                Ok(tracks) => {
298                    let count = tracks.len();
299                    yield MonoEvent::PlayerAck {
300                        action: "queue_batch".to_string(),
301                        message: format!("{count} tracks queued"),
302                    };
303                    yield MonoEvent::Queue {
304                        tracks,
305                        current_index: Some(0),
306                    };
307                }
308                Err(e) => yield MonoEvent::Error { message: e },
309            }
310        }
311    }
312
313    /// Clear the playback queue
314    #[plexus_macros::hub_method(
315        description = "Clear all tracks from the queue (does not stop current track)"
316    )]
317    pub async fn queue_clear(&self) -> impl Stream<Item = MonoEvent> + Send + 'static {
318        let player = self.player.clone();
319        stream! {
320            player.queue_clear().await;
321            yield MonoEvent::PlayerAck {
322                action: "queue_clear".to_string(),
323                message: "queue cleared".to_string(),
324            };
325        }
326    }
327
328    /// List queue contents
329    #[plexus_macros::hub_method(
330        description = "Get the current queue contents including the now-playing track"
331    )]
332    pub async fn queue_get(&self) -> impl Stream<Item = MonoEvent> + Send + 'static {
333        let player = self.player.clone();
334        stream! {
335            let (current, upcoming) = player.queue_get().await;
336            let current_index = if current.is_some() { Some(0usize) } else { None };
337            let mut tracks = Vec::new();
338            if let Some(c) = current {
339                tracks.push(c);
340            }
341            tracks.extend(upcoming);
342            yield MonoEvent::Queue {
343                tracks,
344                current_index,
345            };
346        }
347    }
348
349    /// Reorder tracks in the queue
350    #[plexus_macros::hub_method(
351        description = "Move a track within the queue",
352        params(
353            from = "Source index in the queue (0-based)",
354            to = "Destination index in the queue (0-based)"
355        )
356    )]
357    pub async fn queue_reorder(
358        &self,
359        from: u32,
360        to: u32,
361    ) -> impl Stream<Item = MonoEvent> + Send + 'static {
362        let player = self.player.clone();
363        stream! {
364            match player.queue_reorder(from as usize, to as usize).await {
365                Ok(()) => yield MonoEvent::PlayerAck {
366                    action: "queue_reorder".to_string(),
367                    message: format!("moved track from position {from} to {to}"),
368                },
369                Err(e) => yield MonoEvent::Error { message: e },
370            }
371        }
372    }
373
374    /// Get current playback status (single snapshot, returns immediately)
375    #[plexus_macros::hub_method(
376        description = "Get current playback status — track, position, volume, queue length"
377    )]
378    pub async fn status(&self) -> impl Stream<Item = MonoEvent> + Send + 'static {
379        let rx = self.player.subscribe_now_playing();
380        let np = rx.borrow().clone();
381        stream! {
382            yield MonoEvent::NowPlaying {
383                track_id: np.track_id,
384                title: np.title,
385                artist: np.artist,
386                album: np.album,
387                status: np.status,
388                position_secs: np.position_secs,
389                duration_secs: np.duration_secs,
390                volume: np.volume,
391                preamp: np.preamp,
392                queue_length: np.queue_length,
393                url: np.url,
394            };
395        }
396    }
397
398    /// Stream now-playing updates (~1s while playing)
399    #[plexus_macros::hub_method(
400        streaming,
401        description = "Stream real-time playback position and status updates (~1s interval while playing)"
402    )]
403    pub async fn now_playing(&self) -> impl Stream<Item = MonoEvent> + Send + 'static {
404        let mut rx = self.player.subscribe_now_playing();
405        stream! {
406            // Emit current state immediately
407            {
408                let np = rx.borrow().clone();
409                yield MonoEvent::NowPlaying {
410                    track_id: np.track_id,
411                    title: np.title,
412                    artist: np.artist,
413                    album: np.album,
414                    status: np.status,
415                    position_secs: np.position_secs,
416                    duration_secs: np.duration_secs,
417                    volume: np.volume,
418                    preamp: np.preamp,
419                    queue_length: np.queue_length,
420                    url: np.url,
421                };
422            }
423            // Then stream updates
424            while rx.changed().await.is_ok() {
425                let np = rx.borrow().clone();
426                yield MonoEvent::NowPlaying {
427                    track_id: np.track_id,
428                    title: np.title,
429                    artist: np.artist,
430                    album: np.album,
431                    status: np.status,
432                    position_secs: np.position_secs,
433                    duration_secs: np.duration_secs,
434                    volume: np.volume,
435                    preamp: np.preamp,
436                    queue_length: np.queue_length,
437                    url: np.url,
438                };
439            }
440        }
441    }
442}