1use async_stream::try_stream;
2use bincode;
3use futures_core::stream::Stream;
4use serde::{Deserialize, Serialize};
5use std::error::Error;
6use std::sync::{Arc, Mutex};
7use strum_macros::{Display, EnumIter, EnumString};
8use tokio::io::{AsyncReadExt, AsyncWriteExt};
9use tokio::net::{TcpListener, TcpStream};
10
11use crate::config::Config;
12use crate::library::*;
13use crate::music::song::Song;
14use color_eyre::Result;
15
16use log::{debug, error, info, trace, warn};
17
18pub struct Server {
19 config: Config,
20}
21
22impl Server {
23 pub async fn start(config: &Config) -> Result<()> {
24 let address = config.server_address.clone() + ":" + &config.server_port.to_string();
25 trace!("Starting TCP server on {:?}", &address);
26 let listener = TcpListener::bind(&address).await?;
27 trace!("Server bound to tcp port");
28
29 let stop_flag = Arc::new(Mutex::new(false));
30
31 loop {
33 let local_stop_flag = stop_flag.clone();
34 let (mut socket, _) = listener.accept().await?;
35 let client_address = socket.peer_addr()?;
36 let client_address = format!("{}", client_address);
37 debug!("New client: {}", client_address);
38
39 let config = config.clone();
40 let handle = tokio::spawn(async move {
41 let mut buf = [0u8; 8];
42
43 loop {
45 match socket.read(&mut buf).await {
46 Ok(n) if n == 0 => break,
48 Ok(n) => n,
49 Err(e) => {
50 error!("failed to read from socket; err = {:?}", e);
51 break;
52 }
53 };
54
55 let size = u64::from_ne_bytes(buf);
56
57 let mut payload = vec![0; size as usize];
58 let res = socket.read_exact(&mut payload[..]).await;
59 trace!("res from socket = {:?}", res);
60
61 let decoded_command = bincode::deserialize::<Command>(&payload);
62 match decoded_command {
63 Ok(command) => {
64 info!("{command} command received");
65 let res =
66 Server::reply(Reply::Received(command.to_string()), &mut socket)
67 .await;
68 trace!("Replied 'received': status: {:?}", res);
69 match command {
70 Command::Play(i) => (),
71 Command::Pause => (),
72 Command::Toggle => (),
73 Command::Next => (),
74 Command::Previous => (),
75
76 Command::Scan => scan(&config).await,
77 Command::List(i) => {
78 let list = list(&config, i).await;
79 match Server::reply(Reply::List(list), &mut socket).await {
80 Ok(_) => trace!("Replied 'list' successfully"),
81 Err(e) => {
82 warn!("Failed to send 'list' reply to client: {:?}", e)
83 }
84 }
85 }
86
87 Command::Ping => (),
88 Command::Restart => (),
89 Command::Stop => {
90 let mut flag = local_stop_flag.lock().unwrap();
91 *flag = true;
92 break;
93 }
94 };
95
96 match Server::reply(Reply::Done, &mut socket).await {
97 Ok(_) => trace!("Replied 'done' successfully"),
98 Err(e) => warn!("Failed to send 'done' to client: {:?}", e),
99 }
100 }
101 Err(e) => warn!("failed to decode message payload; err = {:?}", e),
102 };
103 }
104 Server::reply(Reply::Done, &mut socket).await; trace!("Terminating tokio thread");
106 });
107 trace!("Waiting on tokio thread join for shutdown...");
108 let res = tokio::join!(handle);
109
110 if *stop_flag.lock().unwrap() {
113 break Ok(());
114 }
115 }
116 }
117 pub async fn send_wait(
118 message: &Command,
119 address: &str,
120 ) -> Result<(), Box<dyn Error + Send + Sync>> {
121 let encoded: Vec<u8> = message.prepare_query()?;
122 let mut stream = TcpStream::connect(address).await?;
123 stream.write_all(&encoded).await?;
124
125 let mut buf = [0u8; 8];
127 loop {
128 match stream.read(&mut buf).await {
129 Ok(n) if n == 0 => break,
131 Ok(n) => n,
132 Err(e) => {
133 error!("failed to read from socket; err = {:?}", e);
134 break;
135 }
136 };
137
138 let size = u64::from_ne_bytes(buf);
139
140 let mut payload = vec![0; size as usize];
141 let res = stream.read_exact(&mut payload[..]).await;
142 trace!("res from socket = {:?}", res);
143
144 let decoded_reply = bincode::deserialize::<Reply>(&payload);
145 if let Ok(Reply::Done) = decoded_reply {
146 break;
147 }
148 }
149
150 Ok(())
151 }
152
153 pub async fn send<'a>(
154 message: &'a Command,
155 address: &'a str,
156 ) -> impl Stream<Item = Result<Reply, Box<dyn Error + Send + Sync>>> + 'a {
157 try_stream! {
158 let encoded: Vec<u8> = message.prepare_query()?;
159 let mut stream = TcpStream::connect(address).await?;
160 stream.write_all(&encoded).await?;
161 let mut buf = [0u8; 8];
163 loop {
164 match stream.read(&mut buf).await {
165 Ok(n) if n == 0 => break,
167 Ok(n) => n,
168 Err(e) => {
169 error!("failed to read from socket; err = {:?}", e);
170 break;
171 }
172 };
173
174 let size = u64::from_ne_bytes(buf);
175
176 let mut payload = vec![0; size as usize];
177 let res = stream.read_exact(&mut payload[..]).await;
178 trace!("res from socket = {:?}", res);
179
180 let decoded_reply = bincode::deserialize::<Reply>(&payload)?;
181 match decoded_reply {
182 Reply::Done => { yield Reply::Done; break },
183 r => yield r,
184 }
185
186
187 }
188
189 }
190 }
191
192 async fn reply(
193 reply: Reply,
194 stream: &mut TcpStream,
195 ) -> Result<(), Box<dyn Error + Send + Sync>> {
196 let encoded_reply: Vec<u8> = reply.prepare_query()?;
197 stream.write_all(&encoded_reply).await?;
198 Ok(())
199 }
200 }
207
208#[non_exhaustive]
209#[derive(Display, Debug, Serialize, Deserialize, EnumString, EnumIter)]
210pub enum Command {
211 Play(Option<String>),
213 Pause,
214 Toggle,
215 Next,
216 Previous,
217
218 Scan,
220 List(Option<String>),
221
222 Ping,
224 Restart,
225 Stop,
226}
227
228#[non_exhaustive]
229#[derive(Display, Debug, Serialize, Deserialize, EnumString, EnumIter)]
230pub enum Reply {
231 Received(String),
232 List(Vec<Song>),
233 Done,
234}
235
236impl Command {
237 fn prepare_query(&self) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>> {
238 match bincode::serialized_size(self) {
240 Ok(size) => {
241 let mut message: Vec<u8> = (size as u64).to_ne_bytes().to_vec();
242 message.extend(bincode::serialize(self).unwrap());
244 Ok(message)
245 }
246 Err(e) => Err(Box::new(e)),
247 }
248 }
249}
250
251impl Reply {
252 fn prepare_query(&self) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>> {
253 match bincode::serialized_size(self) {
255 Ok(size) => {
256 let mut message: Vec<u8> = (size as u64).to_ne_bytes().to_vec();
257 message.extend(bincode::serialize(self).unwrap());
259 Ok(message)
260 }
261 Err(e) => Err(Box::new(e)),
262 }
263 }
264}