1use futures_channel::mpsc::{unbounded, UnboundedSender};
2use futures_util::{future, pin_mut, stream::TryStreamExt, StreamExt};
3use std::collections::HashMap;
4use std::net::SocketAddr;
5use std::path::PathBuf;
6use std::sync::{self, Arc};
7
8use music_player_playback::player::PlayerCommand;
9use music_player_settings::{read_settings, Settings};
10use music_player_storage::Database;
11use music_player_tracklist::Tracklist as TracklistState;
12use owo_colors::OwoColorize;
13use tokio::net::{TcpListener, TcpStream, UnixListener};
14use tokio::sync::mpsc::UnboundedSender as TokioUnboundedSender;
15use tokio::sync::Mutex;
16use tokio_stream::wrappers::UnixListenerStream;
17use tonic::transport::Server;
18use tungstenite::Message;
19
20use crate::{
21 addons::Addons, core::Core, history::History, library::Library, mixer::Mixer,
22 playback::Playback, playlist::Playlist, tracklist::Tracklist,
23};
24
25use crate::api::music::v1alpha1::{
26 addons_service_server::AddonsServiceServer, core_service_server::CoreServiceServer,
27 history_service_server::HistoryServiceServer, library_service_server::LibraryServiceServer,
28 mixer_service_server::MixerServiceServer, playback_service_server::PlaybackServiceServer,
29 playlist_service_server::PlaylistServiceServer,
30 tracklist_service_server::TracklistServiceServer,
31};
32
33const BANNER: &str = r#"
34 __ ___ _ ____ __
35 / |/ /_ _______(_)____/ __ \/ /___ ___ _____ _____
36 / /|_/ / / / / ___/ / ___/ /_/ / / __ `/ / / / _ \/ ___/
37 / / / / /_/ (__ ) / /__/ ____/ / /_/ / /_/ / __/ /
38/_/ /_/\__,_/____/_/\___/_/ /_/\__,_/\__, /\___/_/
39 /____/
40"#;
41
42type Tx = UnboundedSender<Message>;
43type PeerMap = Arc<sync::Mutex<HashMap<SocketAddr, Tx>>>;
44
45pub struct MusicPlayerServer {
46 db: Database,
47 tracklist: Arc<std::sync::Mutex<TracklistState>>,
48 cmd_tx: Arc<std::sync::Mutex<TokioUnboundedSender<PlayerCommand>>>,
49 peer_map: PeerMap,
50}
51
52impl MusicPlayerServer {
53 pub fn new(
54 tracklist: Arc<std::sync::Mutex<TracklistState>>,
55 cmd_tx: Arc<std::sync::Mutex<TokioUnboundedSender<PlayerCommand>>>,
56 peer_map: PeerMap,
57 db: Database,
58 ) -> Self {
59 Self {
60 db,
61 tracklist,
62 cmd_tx,
63 peer_map,
64 }
65 }
66
67 pub async fn start(&self) -> Result<(), Box<dyn std::error::Error>> {
68 let config = read_settings().unwrap();
69 let settings = config.try_deserialize::<Settings>().unwrap();
70 let addr: SocketAddr = format!("0.0.0.0:{}", settings.port).parse().unwrap();
71
72 println!("{}", BANNER.magenta());
73 println!("Server listening on {}", addr.cyan());
74 debug!("Listening on {:?}", addr);
75
76 Server::builder()
77 .accept_http1(true)
78 .add_service(tonic_web::enable(AddonsServiceServer::new(Addons::new(
79 self.db.clone(),
80 ))))
81 .add_service(tonic_web::enable(CoreServiceServer::new(Core::default())))
82 .add_service(tonic_web::enable(HistoryServiceServer::new(History::new(
83 self.db.clone(),
84 ))))
85 .add_service(tonic_web::enable(LibraryServiceServer::new(Library::new(
86 self.db.clone(),
87 ))))
88 .add_service(tonic_web::enable(MixerServiceServer::new(Mixer::default())))
89 .add_service(tonic_web::enable(PlaybackServiceServer::new(
90 Playback::new(Arc::clone(&self.tracklist), Arc::clone(&self.cmd_tx)),
91 )))
92 .add_service(tonic_web::enable(PlaylistServiceServer::new(
93 Playlist::new(self.db.clone()),
94 )))
95 .add_service(tonic_web::enable(TracklistServiceServer::new(
96 Tracklist::new(
97 Arc::clone(&self.tracklist),
98 Arc::clone(&self.cmd_tx),
99 self.db.clone(),
100 ),
101 )))
102 .serve(addr)
103 .await?;
104 Ok(())
105 }
106
107 pub async fn start_over_unix_domain_socket(
108 &self,
109 path: &str,
110 ) -> Result<(), Box<dyn std::error::Error>> {
111 let socket_path = PathBuf::from(path);
112
113 if socket_path.exists() {
114 std::fs::remove_file(&socket_path)?;
115 }
116
117 let listener = UnixListener::bind(socket_path)?;
118
119 debug!("Listening on {:?}", listener.local_addr()?);
120
121 Server::builder()
122 .accept_http1(true)
123 .add_service(tonic_web::enable(AddonsServiceServer::new(Addons::new(
124 self.db.clone(),
125 ))))
126 .add_service(tonic_web::enable(CoreServiceServer::new(Core::default())))
127 .add_service(tonic_web::enable(HistoryServiceServer::new(History::new(
128 self.db.clone(),
129 ))))
130 .add_service(tonic_web::enable(LibraryServiceServer::new(Library::new(
131 self.db.clone(),
132 ))))
133 .add_service(tonic_web::enable(MixerServiceServer::new(Mixer::default())))
134 .add_service(tonic_web::enable(PlaybackServiceServer::new(
135 Playback::new(Arc::clone(&self.tracklist), Arc::clone(&self.cmd_tx)),
136 )))
137 .add_service(tonic_web::enable(PlaylistServiceServer::new(
138 Playlist::new(self.db.clone()),
139 )))
140 .add_service(tonic_web::enable(TracklistServiceServer::new(
141 Tracklist::new(
142 Arc::clone(&self.tracklist),
143 Arc::clone(&self.cmd_tx),
144 self.db.clone(),
145 ),
146 )))
147 .serve_with_incoming(UnixListenerStream::new(listener))
148 .await?;
149 Ok(())
150 }
151
152 pub async fn start_ws(&self) -> Result<(), Box<dyn std::error::Error>> {
153 let config = read_settings().unwrap();
154 let settings = config.try_deserialize::<Settings>().unwrap();
155 let addr: SocketAddr = format!("0.0.0.0:{}", settings.ws_port).parse().unwrap();
156
157 let try_socket = TcpListener::bind(addr).await;
158 let listener = try_socket.expect("Failed to bind");
159 println!("Websocket server listening on {}", addr.cyan());
160
161 while let Ok((stream, addr)) = listener.accept().await {
163 tokio::spawn(handle_connection(Arc::clone(&self.peer_map), stream, addr));
164 }
165 Ok(())
166 }
167}
168
169async fn handle_connection(peer_map: PeerMap, raw_stream: TcpStream, addr: SocketAddr) {
170 println!("Incoming TCP connection from: {}", addr.bright_green());
171 let ws_stream = tokio_tungstenite::accept_async(raw_stream)
172 .await
173 .expect("Error during the websocket handshake occurred");
174 println!("WebSocket connection established: {}", addr.bright_green());
175
176 let (tx, rx) = unbounded();
177 peer_map.lock().unwrap().insert(addr, tx);
178
179 let (outgoing, incoming) = ws_stream.split();
180
181 let broadcast_incoming = incoming.try_for_each(|msg| {
182 println!(
183 "Received a message from {}: {}",
184 addr,
185 msg.to_text().unwrap()
186 );
187 let peers = peer_map.lock().unwrap();
188 let broadcast_recipients = peers.iter().map(|(_, ws_sink)| ws_sink);
190
191 for recp in broadcast_recipients {
192 recp.unbounded_send(msg.clone()).unwrap();
193 }
194
195 future::ok(())
196 });
197
198 let receive_from_others = rx.map(Ok).forward(outgoing);
199
200 pin_mut!(broadcast_incoming, receive_from_others);
201 future::select(broadcast_incoming, receive_from_others).await;
202
203 println!("{} disconnected", &addr);
204 peer_map.lock().unwrap().remove(&addr);
205}