ouverture_core/
server.rs

1use async_stream::try_stream;
2use bincode;
3use futures_core::stream::Stream;
4use serde::{Deserialize, Serialize};
5use std::error::Error;
6use std::sync::{Arc, Mutex};
7use strum_macros::{Display, EnumIter, EnumString};
8use tokio::io::{AsyncReadExt, AsyncWriteExt};
9use tokio::net::{TcpListener, TcpStream};
10
11use crate::config::Config;
12use crate::library::*;
13use crate::music::song::Song;
14use color_eyre::Result;
15
16use log::{debug, error, info, trace, warn};
17
18pub struct Server {
19    config: Config,
20}
21
22impl Server {
23    pub async fn start(config: &Config) -> Result<()> {
24        let address = config.server_address.clone() + ":" + &config.server_port.to_string();
25        trace!("Starting TCP server on {:?}", &address);
26        let listener = TcpListener::bind(&address).await?;
27        trace!("Server bound to tcp port");
28
29        let stop_flag = Arc::new(Mutex::new(false));
30
31        // accept many clients at the same time
32        loop {
33            let local_stop_flag = stop_flag.clone();
34            let (mut socket, _) = listener.accept().await?;
35            let client_address = socket.peer_addr()?;
36            let client_address = format!("{}", client_address);
37            debug!("New client: {}", client_address);
38
39            let config = config.clone();
40            let handle = tokio::spawn(async move {
41                let mut buf = [0u8; 8];
42
43                // In a loop, read all the data from the socket
44                loop {
45                    match socket.read(&mut buf).await {
46                        // socket closed
47                        Ok(n) if n == 0 => break,
48                        Ok(n) => n,
49                        Err(e) => {
50                            error!("failed to read from socket; err = {:?}", e);
51                            break;
52                        }
53                    };
54
55                    let size = u64::from_ne_bytes(buf);
56
57                    let mut payload = vec![0; size as usize];
58                    let res = socket.read_exact(&mut payload[..]).await;
59                    trace!("res from socket = {:?}", res);
60
61                    let decoded_command = bincode::deserialize::<Command>(&payload);
62                    match decoded_command {
63                        Ok(command) => {
64                            info!("{command} command received");
65                            let res =
66                                Server::reply(Reply::Received(command.to_string()), &mut socket)
67                                    .await;
68                            trace!("Replied 'received': status: {:?}", res);
69                            match command {
70                                Command::Play(i) => (),
71                                Command::Pause => (),
72                                Command::Toggle => (),
73                                Command::Next => (),
74                                Command::Previous => (),
75
76                                Command::Scan => scan(&config).await,
77                                Command::List(i) => {
78                                    let list = list(&config, i).await;
79                                    match Server::reply(Reply::List(list), &mut socket).await {
80                                        Ok(_) => trace!("Replied 'list' successfully"),
81                                        Err(e) => {
82                                            warn!("Failed to send 'list' reply to client: {:?}", e)
83                                        }
84                                    }
85                                }
86
87                                Command::Ping => (),
88                                Command::Restart => (),
89                                Command::Stop => {
90                                    let mut flag = local_stop_flag.lock().unwrap();
91                                    *flag = true;
92                                    break;
93                                }
94                            };
95
96                            match Server::reply(Reply::Done, &mut socket).await {
97                                Ok(_) => trace!("Replied 'done' successfully"),
98                                Err(e) => warn!("Failed to send 'done' to client: {:?}", e),
99                            }
100                        }
101                        Err(e) => warn!("failed to decode message payload; err = {:?}", e),
102                    };
103                }
104                Server::reply(Reply::Done, &mut socket).await; // when exiting because of 'stop'
105                trace!("Terminating tokio thread");
106            });
107            trace!("Waiting on tokio thread join for shutdown...");
108            let res = tokio::join!(handle);
109
110            // in case the Stop command was received, exit the loop.
111            // The binded address is released at 'listener' drop
112            if *stop_flag.lock().unwrap() {
113                break Ok(());
114            }
115        }
116    }
117    pub async fn send_wait(
118        message: &Command,
119        address: &str,
120    ) -> Result<(), Box<dyn Error + Send + Sync>> {
121        let encoded: Vec<u8> = message.prepare_query()?;
122        let mut stream = TcpStream::connect(address).await?;
123        stream.write_all(&encoded).await?;
124
125        // Wait for 'done', displaying all other replies
126        let mut buf = [0u8; 8];
127        loop {
128            match stream.read(&mut buf).await {
129                // socket closed
130                Ok(n) if n == 0 => break,
131                Ok(n) => n,
132                Err(e) => {
133                    error!("failed to read from socket; err = {:?}", e);
134                    break;
135                }
136            };
137
138            let size = u64::from_ne_bytes(buf);
139
140            let mut payload = vec![0; size as usize];
141            let res = stream.read_exact(&mut payload[..]).await;
142            trace!("res from socket = {:?}", res);
143
144            let decoded_reply = bincode::deserialize::<Reply>(&payload);
145            if let Ok(Reply::Done) = decoded_reply {
146                break;
147            }
148        }
149
150        Ok(())
151    }
152
153    pub async fn send<'a>(
154        message: &'a Command,
155        address: &'a str,
156    ) -> impl Stream<Item = Result<Reply, Box<dyn Error + Send + Sync>>> + 'a {
157        try_stream! {
158            let encoded: Vec<u8> = message.prepare_query()?;
159            let mut stream = TcpStream::connect(address).await?;
160            stream.write_all(&encoded).await?;
161             // Wait for 'done', yielding all other replies
162            let mut buf = [0u8; 8];
163            loop {
164                match stream.read(&mut buf).await {
165                    // socket closed
166                    Ok(n) if n == 0 => break,
167                    Ok(n) => n,
168                    Err(e) => {
169                        error!("failed to read from socket; err = {:?}", e);
170                        break;
171                    }
172                };
173
174                let size = u64::from_ne_bytes(buf);
175
176                let mut payload = vec![0; size as usize];
177                let res = stream.read_exact(&mut payload[..]).await;
178                trace!("res from socket = {:?}", res);
179
180                let decoded_reply = bincode::deserialize::<Reply>(&payload)?;
181             match decoded_reply {
182                 Reply::Done => { yield Reply::Done; break },
183                 r => yield r,
184             }
185
186
187            }
188
189        }
190    }
191
192    async fn reply(
193        reply: Reply,
194        stream: &mut TcpStream,
195    ) -> Result<(), Box<dyn Error + Send + Sync>> {
196        let encoded_reply: Vec<u8> = reply.prepare_query()?;
197        stream.write_all(&encoded_reply).await?;
198        Ok(())
199    }
200    // async fn reply(reply: Reply, address: &str) -> Result<(), Box<dyn Error + Send + Sync>> {
201    //        let encoded_reply: Vec<u8> = reply.prepare_query()?;
202    //        let mut stream = TcpStream::connect(address).await?;
203    //        stream.write_all(&encoded_reply).await?;
204    //        Ok(())
205    //    }
206}
207
208#[non_exhaustive]
209#[derive(Display, Debug, Serialize, Deserialize, EnumString, EnumIter)]
210pub enum Command {
211    // "Music" commands
212    Play(Option<String>),
213    Pause,
214    Toggle,
215    Next,
216    Previous,
217
218    // "Library" commands
219    Scan,
220    List(Option<String>),
221
222    // "Server" commands
223    Ping,
224    Restart,
225    Stop,
226}
227
228#[non_exhaustive]
229#[derive(Display, Debug, Serialize, Deserialize, EnumString, EnumIter)]
230pub enum Reply {
231    Received(String),
232    List(Vec<Song>),
233    Done,
234}
235
236impl Command {
237    fn prepare_query(&self) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>> {
238        // create a 8-bytes prefix: the length of the whole (prefix+message)
239        match bincode::serialized_size(self) {
240            Ok(size) => {
241                let mut message: Vec<u8> = (size as u64).to_ne_bytes().to_vec();
242                // add the serialized content to the message
243                message.extend(bincode::serialize(self).unwrap());
244                Ok(message)
245            }
246            Err(e) => Err(Box::new(e)),
247        }
248    }
249}
250
251impl Reply {
252    fn prepare_query(&self) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>> {
253        // create a 8-bytes prefix: the length of the whole (prefix+message)
254        match bincode::serialized_size(self) {
255            Ok(size) => {
256                let mut message: Vec<u8> = (size as u64).to_ne_bytes().to_vec();
257                // add the serialized content to the message
258                message.extend(bincode::serialize(self).unwrap());
259                Ok(message)
260            }
261            Err(e) => Err(Box::new(e)),
262        }
263    }
264}