music_player_server/
server.rs

1use futures_channel::mpsc::{unbounded, UnboundedSender};
2use futures_util::{future, pin_mut, stream::TryStreamExt, StreamExt};
3use std::collections::HashMap;
4use std::net::SocketAddr;
5use std::path::PathBuf;
6use std::sync::{self, Arc};
7
8use music_player_playback::player::PlayerCommand;
9use music_player_settings::{read_settings, Settings};
10use music_player_storage::Database;
11use music_player_tracklist::Tracklist as TracklistState;
12use owo_colors::OwoColorize;
13use tokio::net::{TcpListener, TcpStream, UnixListener};
14use tokio::sync::mpsc::UnboundedSender as TokioUnboundedSender;
15use tokio::sync::Mutex;
16use tokio_stream::wrappers::UnixListenerStream;
17use tonic::transport::Server;
18use tungstenite::Message;
19
20use crate::{
21    addons::Addons, core::Core, history::History, library::Library, mixer::Mixer,
22    playback::Playback, playlist::Playlist, tracklist::Tracklist,
23};
24
25use crate::api::music::v1alpha1::{
26    addons_service_server::AddonsServiceServer, core_service_server::CoreServiceServer,
27    history_service_server::HistoryServiceServer, library_service_server::LibraryServiceServer,
28    mixer_service_server::MixerServiceServer, playback_service_server::PlaybackServiceServer,
29    playlist_service_server::PlaylistServiceServer,
30    tracklist_service_server::TracklistServiceServer,
31};
32
33const BANNER: &str = r#"
34    __  ___           _      ____  __                     
35   /  |/  /_  _______(_)____/ __ \/ /___ ___  _____  _____
36  / /|_/ / / / / ___/ / ___/ /_/ / / __ `/ / / / _ \/ ___/
37 / /  / / /_/ (__  ) / /__/ ____/ / /_/ / /_/ /  __/ /    
38/_/  /_/\__,_/____/_/\___/_/   /_/\__,_/\__, /\___/_/     
39                                       /____/             
40"#;
41
42type Tx = UnboundedSender<Message>;
43type PeerMap = Arc<sync::Mutex<HashMap<SocketAddr, Tx>>>;
44
45pub struct MusicPlayerServer {
46    db: Database,
47    tracklist: Arc<std::sync::Mutex<TracklistState>>,
48    cmd_tx: Arc<std::sync::Mutex<TokioUnboundedSender<PlayerCommand>>>,
49    peer_map: PeerMap,
50}
51
52impl MusicPlayerServer {
53    pub fn new(
54        tracklist: Arc<std::sync::Mutex<TracklistState>>,
55        cmd_tx: Arc<std::sync::Mutex<TokioUnboundedSender<PlayerCommand>>>,
56        peer_map: PeerMap,
57        db: Database,
58    ) -> Self {
59        Self {
60            db,
61            tracklist,
62            cmd_tx,
63            peer_map,
64        }
65    }
66
67    pub async fn start(&self) -> Result<(), Box<dyn std::error::Error>> {
68        let config = read_settings().unwrap();
69        let settings = config.try_deserialize::<Settings>().unwrap();
70        let addr: SocketAddr = format!("0.0.0.0:{}", settings.port).parse().unwrap();
71
72        println!("{}", BANNER.magenta());
73        println!("Server listening on {}", addr.cyan());
74        debug!("Listening on {:?}", addr);
75
76        Server::builder()
77            .accept_http1(true)
78            .add_service(tonic_web::enable(AddonsServiceServer::new(Addons::new(
79                self.db.clone(),
80            ))))
81            .add_service(tonic_web::enable(CoreServiceServer::new(Core::default())))
82            .add_service(tonic_web::enable(HistoryServiceServer::new(History::new(
83                self.db.clone(),
84            ))))
85            .add_service(tonic_web::enable(LibraryServiceServer::new(Library::new(
86                self.db.clone(),
87            ))))
88            .add_service(tonic_web::enable(MixerServiceServer::new(Mixer::default())))
89            .add_service(tonic_web::enable(PlaybackServiceServer::new(
90                Playback::new(Arc::clone(&self.tracklist), Arc::clone(&self.cmd_tx)),
91            )))
92            .add_service(tonic_web::enable(PlaylistServiceServer::new(
93                Playlist::new(self.db.clone()),
94            )))
95            .add_service(tonic_web::enable(TracklistServiceServer::new(
96                Tracklist::new(
97                    Arc::clone(&self.tracklist),
98                    Arc::clone(&self.cmd_tx),
99                    self.db.clone(),
100                ),
101            )))
102            .serve(addr)
103            .await?;
104        Ok(())
105    }
106
107    pub async fn start_over_unix_domain_socket(
108        &self,
109        path: &str,
110    ) -> Result<(), Box<dyn std::error::Error>> {
111        let socket_path = PathBuf::from(path);
112
113        if socket_path.exists() {
114            std::fs::remove_file(&socket_path)?;
115        }
116
117        let listener = UnixListener::bind(socket_path)?;
118
119        debug!("Listening on {:?}", listener.local_addr()?);
120
121        Server::builder()
122            .accept_http1(true)
123            .add_service(tonic_web::enable(AddonsServiceServer::new(Addons::new(
124                self.db.clone(),
125            ))))
126            .add_service(tonic_web::enable(CoreServiceServer::new(Core::default())))
127            .add_service(tonic_web::enable(HistoryServiceServer::new(History::new(
128                self.db.clone(),
129            ))))
130            .add_service(tonic_web::enable(LibraryServiceServer::new(Library::new(
131                self.db.clone(),
132            ))))
133            .add_service(tonic_web::enable(MixerServiceServer::new(Mixer::default())))
134            .add_service(tonic_web::enable(PlaybackServiceServer::new(
135                Playback::new(Arc::clone(&self.tracklist), Arc::clone(&self.cmd_tx)),
136            )))
137            .add_service(tonic_web::enable(PlaylistServiceServer::new(
138                Playlist::new(self.db.clone()),
139            )))
140            .add_service(tonic_web::enable(TracklistServiceServer::new(
141                Tracklist::new(
142                    Arc::clone(&self.tracklist),
143                    Arc::clone(&self.cmd_tx),
144                    self.db.clone(),
145                ),
146            )))
147            .serve_with_incoming(UnixListenerStream::new(listener))
148            .await?;
149        Ok(())
150    }
151
152    pub async fn start_ws(&self) -> Result<(), Box<dyn std::error::Error>> {
153        let config = read_settings().unwrap();
154        let settings = config.try_deserialize::<Settings>().unwrap();
155        let addr: SocketAddr = format!("0.0.0.0:{}", settings.ws_port).parse().unwrap();
156
157        let try_socket = TcpListener::bind(addr).await;
158        let listener = try_socket.expect("Failed to bind");
159        println!("Websocket server listening on {}", addr.cyan());
160
161        // Let's spawn the handling of each connection in a separate task.
162        while let Ok((stream, addr)) = listener.accept().await {
163            tokio::spawn(handle_connection(Arc::clone(&self.peer_map), stream, addr));
164        }
165        Ok(())
166    }
167}
168
169async fn handle_connection(peer_map: PeerMap, raw_stream: TcpStream, addr: SocketAddr) {
170    println!("Incoming TCP connection from: {}", addr.bright_green());
171    let ws_stream = tokio_tungstenite::accept_async(raw_stream)
172        .await
173        .expect("Error during the websocket handshake occurred");
174    println!("WebSocket connection established: {}", addr.bright_green());
175
176    let (tx, rx) = unbounded();
177    peer_map.lock().unwrap().insert(addr, tx);
178
179    let (outgoing, incoming) = ws_stream.split();
180
181    let broadcast_incoming = incoming.try_for_each(|msg| {
182        println!(
183            "Received a message from {}: {}",
184            addr,
185            msg.to_text().unwrap()
186        );
187        let peers = peer_map.lock().unwrap();
188        // We want to broadcast the message to everyone except ourselves.
189        let broadcast_recipients = peers.iter().map(|(_, ws_sink)| ws_sink);
190
191        for recp in broadcast_recipients {
192            recp.unbounded_send(msg.clone()).unwrap();
193        }
194
195        future::ok(())
196    });
197
198    let receive_from_others = rx.map(Ok).forward(outgoing);
199
200    pin_mut!(broadcast_incoming, receive_from_others);
201    future::select(broadcast_incoming, receive_from_others).await;
202
203    println!("{} disconnected", &addr);
204    peer_map.lock().unwrap().remove(&addr);
205}