snapcast_control/communication.rs
1use stubborn_io::StubbornTcpStream;
2use uuid::Uuid;
3
4use crate::{
5 errors,
6 protocol::{self, client, group, server, stream, Request, RequestMethod, SentRequests},
7 state::WrappedState,
8 Message, Method, ValidMessage,
9};
10
11type Sender =
12 futures::stream::SplitSink<tokio_util::codec::Framed<StubbornTcpStream<std::net::SocketAddr>, Communication>, Method>;
13type Receiver =
14 futures::stream::SplitStream<tokio_util::codec::Framed<StubbornTcpStream<std::net::SocketAddr>, Communication>>;
15
16/// Struct representing a connection to a Snapcast server.
17/// Contains the current state of the server and methods to interact with it.
18///
19/// call `SnapcastConnection::open` to create a new connection.
20pub struct SnapcastConnection {
21 /// The current state of the server. The state is Send + Sync, so it can be shared between threads.
22 pub state: WrappedState,
23
24 // internal
25 sender: Sender,
26 receiver: Receiver,
27}
28
29impl SnapcastConnection {
30 /// open a new connection to a Snapcast server
31 ///
32 /// # args
33 /// `address`: [std::net::SocketAddr] - the address of the Snapcast server
34 ///
35 /// # returns
36 /// a new [SnapcastConnection] struct
37 ///
38 /// # example
39 /// ```no_run
40 /// let mut client = SnapcastConnection::open("127.0.0.1:1705".parse().expect("could not parse socket address")).await;
41 /// ```
42 pub async fn open(address: std::net::SocketAddr) -> Self {
43 let state = WrappedState::default();
44 let (sender, receiver) = Communication::init(address).await;
45
46 Self {
47 state,
48 sender,
49 receiver,
50 }
51 }
52
53 /// send a raw command to the Snapcast server
54 ///
55 /// # args
56 /// `command`: [Method] - the command to send
57 ///
58 /// # returns
59 /// an empty [Ok] if the command was sent successfully, or a [ClientError] if there was an error
60 ///
61 /// # example
62 /// ```no_run
63 /// client.send(Method::ServerGetStatus).await.expect("could not send command");
64 /// ```
65 pub async fn send(&mut self, command: Method) -> Result<(), ClientError> {
66 use futures::SinkExt;
67
68 self.sender.send(command).await
69 }
70
71 /// receive a message from the Snapcast server
72 ///
73 /// uses a [futures::stream::Next] under the hood, so: \
74 /// creates a future that resolves to the next item in the stream
75 ///
76 /// # returns
77 /// an [Option] containing an [Ok] with a [ValidMessage] if a message was received, \
78 /// an [Option] containing an [Err] with a [ClientError] if there was an error, \
79 /// or [None] if the stream has ended
80 ///
81 /// # example
82 /// ```no_run
83 /// let message = client.recv().await.expect("could not receive message");
84 /// ```
85 pub async fn recv(&mut self) -> Option<Result<ValidMessage, ClientError>> {
86 use futures::StreamExt;
87
88 let message = self.receiver.next().await;
89
90 if let Some(Ok(message)) = message {
91 match &message {
92 Message::Error { error, .. } => return Some(Err(error.clone().into())),
93 Message::Result { result, .. } => self.state.handle_result(*result.clone()),
94 Message::Notification { method, .. } => self.state.handle_notification(*method.clone()),
95 };
96
97 Some(Ok(
98 message
99 .try_into()
100 .expect("this should never fail bc error has returned already"),
101 ))
102 } else if let Some(Err(err)) = message {
103 Some(Err(err))
104 } else {
105 None
106 }
107 }
108
109 // client methods
110 /// request the current status of a client from the Snapcast server
111 ///
112 /// wrapper for sending a [ClientGetStatus](Method::ClientGetStatus) command
113 ///
114 /// # args
115 /// `id`: [String] - the id of the client
116 ///
117 /// # returns
118 /// an empty [Ok] if the command was sent successfully, or a [ClientError] if there was an error
119 ///
120 /// # example
121 /// ```no_run
122 /// client.client_get_status("client_id".to_string()).await.expect("could not get client status");
123 /// ```
124 pub async fn client_get_status(&mut self, id: String) -> Result<(), ClientError> {
125 self
126 .send(Method::ClientGetStatus {
127 params: client::GetStatusParams { id },
128 })
129 .await
130 }
131
132 /// set the volume and mute status of a client
133 ///
134 /// wrapper for sending a [ClientSetVolume](Method::ClientSetVolume) command
135 ///
136 /// # args
137 /// `id`: [String] - the id of the client
138 /// `volume`: [client::ClientVolume] - the volume and mute status to set
139 ///
140 /// # returns
141 /// an empty [Ok] if the command was sent successfully, or a [ClientError] if there was an error
142 ///
143 /// # example
144 /// ```no_run
145 /// client.client_set_mute("client_id".to_string(), client::ClientVolume { mute: false, volume: 50 }).await.expect("could not set client mute");
146 /// ```
147 pub async fn client_set_volume(&mut self, id: String, volume: client::ClientVolume) -> Result<(), ClientError> {
148 self
149 .send(Method::ClientSetVolume {
150 params: client::SetVolumeParams { id, volume },
151 })
152 .await
153 }
154
155 /// set the latency of a client
156 ///
157 /// wrapper for sending a [ClientSetLatency](Method::ClientSetLatency) command
158 ///
159 /// # args
160 /// `id`: [String] - the id of the client
161 /// `latency`: [usize] - the latency to set
162 ///
163 /// # returns
164 /// an empty [Ok] if the command was sent successfully, or a [ClientError] if there was an error
165 ///
166 /// # example
167 /// ```no_run
168 /// client.client_set_latency("client_id".to_string(), 100).await.expect("could not set client latency");
169 /// ```
170 pub async fn client_set_latency(&mut self, id: String, latency: usize) -> Result<(), ClientError> {
171 self
172 .send(Method::ClientSetLatency {
173 params: client::SetLatencyParams { id, latency },
174 })
175 .await
176 }
177
178 /// set the name of a client
179 ///
180 /// wrapper for sending a [ClientSetName](Method::ClientSetName) command
181 ///
182 /// # args
183 /// `id`: [String] - the id of the client
184 /// `name`: [String] - the name to set
185 ///
186 /// # returns
187 /// an empty [Ok] if the command was sent successfully, or a [ClientError] if there was an error
188 ///
189 /// # example
190 /// ```no_run
191 /// client.client_set_name("client_id".to_string(), "new_name".to_string()).await.expect("could not set client name");
192 /// ```
193 pub async fn client_set_name(&mut self, id: String, name: String) -> Result<(), ClientError> {
194 self
195 .send(Method::ClientSetName {
196 params: client::SetNameParams { id, name },
197 })
198 .await
199 }
200
201 // group methods
202 /// request the current status of a group from the Snapcast server
203 ///
204 /// wrapper for sending a [GroupGetStatus](Method::GroupGetStatus) command
205 ///
206 /// # args
207 /// `id`: [String] - the id of the group
208 ///
209 /// # returns
210 /// an empty [Ok] if the command was sent successfully, or a [ClientError] if there was an error
211 ///
212 /// # example
213 /// ```no_run
214 /// client.group_get_status("group_id".to_string()).await.expect("could not get group status");
215 /// ```
216 pub async fn group_get_status(&mut self, id: String) -> Result<(), ClientError> {
217 self
218 .send(Method::GroupGetStatus {
219 params: group::GetStatusParams { id },
220 })
221 .await
222 }
223
224 /// set the mute status of a group
225 ///
226 /// wrapper for sending a [GroupSetMute](Method::GroupSetMute) command
227 ///
228 /// # args
229 /// `id`: [String] - the id of the group
230 /// `mute`: [bool] - the mute status to set
231 ///
232 /// # returns
233 /// an empty [Ok] if the command was sent successfully, or a [ClientError] if there was an error
234 ///
235 /// # example
236 /// ```no_run
237 /// client.group_set_mute("group_id".to_string(), true).await.expect("could not set group mute");
238 /// ```
239 pub async fn group_set_mute(&mut self, id: String, mute: bool) -> Result<(), ClientError> {
240 self
241 .send(Method::GroupSetMute {
242 params: group::SetMuteParams { id, mute },
243 })
244 .await
245 }
246
247 /// set the stream of a group
248 ///
249 /// wrapper for sending a [GroupSetStream](Method::GroupSetStream) command
250 ///
251 /// # args
252 /// `id`: [String] - the id of the group
253 /// `stream_id`: [String] - the id of the stream to set
254 ///
255 /// # returns
256 /// an empty [Ok] if the command was sent successfully, or a [ClientError] if there was an error
257 ///
258 /// # example
259 /// ```no_run
260 /// client.group_set_stream("group_id".to_string(), "stream_id".to_string()).await.expect("could not set group stream");
261 /// ```
262 pub async fn group_set_stream(&mut self, id: String, stream_id: String) -> Result<(), ClientError> {
263 self
264 .send(Method::GroupSetStream {
265 params: group::SetStreamParams { id, stream_id },
266 })
267 .await
268 }
269
270 /// set the clients of a group
271 ///
272 /// wrapper for sending a [GroupSetClients](Method::GroupSetClients) command
273 ///
274 /// # args
275 /// `id`: [String] - the id of the group
276 /// `clients`: [Vec]<[String]> - the ids of the clients to set
277 ///
278 /// # returns
279 /// an empty [Ok] if the command was sent successfully, or a [ClientError] if there was an error
280 ///
281 /// # example
282 /// ```no_run
283 /// client.group_set_clients("group_id".to_string(), vec!["client_id".to_string()]).await.expect("could not set group clients");
284 /// ```
285 pub async fn group_set_clients(&mut self, id: String, clients: Vec<String>) -> Result<(), ClientError> {
286 self
287 .send(Method::GroupSetClients {
288 params: group::SetClientsParams { id, clients },
289 })
290 .await
291 }
292
293 /// set the name of a group
294 ///
295 /// wrapper for sending a [GroupSetName](Method::GroupSetName) command
296 ///
297 /// # args
298 /// `id`: [String] - the id of the group
299 /// `name`: [String] - the name to set
300 ///
301 /// # returns
302 /// an empty [Ok] if the command was sent successfully, or a [ClientError] if there was an error
303 ///
304 /// # example
305 /// ```no_run
306 /// client.group_set_name("group_id".to_string(), "new_name".to_string()).await.expect("could not set group name");
307 /// ```
308 pub async fn group_set_name(&mut self, id: String, name: String) -> Result<(), ClientError> {
309 self
310 .send(Method::GroupSetName {
311 params: group::SetNameParams { id, name },
312 })
313 .await
314 }
315
316 // server methods
317 /// request the rpc version of the Snapcast server
318 ///
319 /// wrapper for sending a [ServerGetStatus](Method::ServerGetStatus) command
320 ///
321 /// # returns
322 /// an empty [Ok] if the command was sent successfully, or a [ClientError] if there was an error
323 ///
324 /// # example
325 /// ```no_run
326 /// client.server_get_rpc_version().await.expect("could not get server rpc version");
327 /// ```
328 pub async fn server_get_rpc_version(&mut self) -> Result<(), ClientError> {
329 self.send(Method::ServerGetRPCVersion).await
330 }
331
332 /// request the current status of the Snapcast server, this is a full refresh for state
333 ///
334 /// wrapper for sending a [ServerGetStatus](Method::ServerGetStatus) command
335 ///
336 /// # returns
337 /// an empty [Ok] if the command was sent successfully, or a [ClientError] if there was an error
338 ///
339 /// # example
340 /// ```no_run
341 /// client.server_get_status().await.expect("could not get server status");
342 /// ```
343 pub async fn server_get_status(&mut self) -> Result<(), ClientError> {
344 self.send(Method::ServerGetStatus).await
345 }
346
347 /// forcefully delete a client from the Snapcast server
348 ///
349 /// wrapper for sending a [ServerDeleteClient](Method::ServerDeleteClient) command
350 ///
351 /// # args
352 /// `id`: [String] - the id of the client to delete
353 ///
354 /// # returns
355 /// an empty [Ok] if the command was sent successfully, or a [ClientError] if there was an error
356 ///
357 /// # example
358 /// ```no_run
359 /// client.server_delete_client("client_id".to_string()).await.expect("could not delete client");
360 /// ```
361 pub async fn server_delete_client(&mut self, id: String) -> Result<(), ClientError> {
362 self
363 .send(Method::ServerDeleteClient {
364 params: server::DeleteClientParams { id },
365 })
366 .await
367 }
368
369 // stream methods
370 /// add a new stream to the Snapcast server
371 ///
372 /// wrapper for sending a [StreamAddStream](Method::StreamAddStream) command
373 ///
374 /// # args
375 /// `stream_uri`: [String] - the uri of the stream to add
376 ///
377 /// # returns
378 /// an empty [Ok] if the command was sent successfully, or a [ClientError] if there was an error
379 ///
380 /// # example
381 /// ```no_run
382 /// client.stream_add_stream("librespot:///usr/bin/librespot?name=Spotify&...".to_string()).await.expect("could not add stream");
383 /// ```
384 pub async fn stream_add_stream(&mut self, stream_uri: String) -> Result<(), ClientError> {
385 self
386 .send(Method::StreamAddStream {
387 params: stream::AddStreamParams { stream_uri },
388 })
389 .await
390 }
391
392 /// remove a stream from the Snapcast server
393 ///
394 /// wrapper for sending a [StreamRemoveStream](Method::StreamRemoveStream) command
395 ///
396 /// # args
397 /// `id`: [String] - the id of the stream to remove
398 ///
399 /// # returns
400 /// an empty [Ok] if the command was sent successfully, or a [ClientError] if there was an error
401 ///
402 /// # example
403 /// ```no_run
404 /// client.stream_remove_stream("stream_id".to_string()).await.expect("could not remove stream");
405 /// ```
406 pub async fn stream_remove_stream(&mut self, id: String) -> Result<(), ClientError> {
407 self
408 .send(Method::StreamRemoveStream {
409 params: stream::RemoveStreamParams { id },
410 })
411 .await
412 }
413
414 /// control a stream on the Snapcast server
415 ///
416 /// wrapper for sending a [StreamControl](Method::StreamControl) command
417 ///
418 /// # args
419 /// `id`: [String] - the id of the stream to control
420 /// `command`: [stream::ControlCommand] - the command to send to the stream
421 ///
422 /// # returns
423 /// an empty [Ok] if the command was sent successfully, or a [ClientError] if there was an error
424 ///
425 /// # example
426 /// ```no_run
427 /// client.stream_control("stream_id".to_string(), stream::ControlCommand::Pause).await.expect("could not control stream");
428 /// ```
429 pub async fn stream_control(&mut self, id: String, command: stream::ControlCommand) -> Result<(), ClientError> {
430 self
431 .send(Method::StreamControl {
432 params: stream::ControlParams { id, command },
433 })
434 .await
435 }
436
437 /// set the property of a stream on the Snapcast server
438 ///
439 /// wrapper for sending a [StreamSetProperty](Method::StreamSetProperty) command
440 ///
441 /// # args
442 /// `id`: [String] - the id of the stream to control
443 /// `properties`: [stream::SetPropertyProperties] - the properties to set on the stream
444 ///
445 /// # returns
446 /// an empty [Ok] if the command was sent successfully, or a [ClientError] if there was an error
447 ///
448 /// # example
449 /// ```no_run
450 /// client.stream_set_property("stream_id".to_string(), stream::SetPropertyProperties::Shuffle(true)).await.expect("could not set stream property");
451 /// ```
452 pub async fn stream_set_property(
453 &mut self,
454 id: String,
455 properties: stream::SetPropertyProperties,
456 ) -> Result<(), ClientError> {
457 self
458 .send(Method::StreamSetProperty {
459 params: stream::SetPropertyParams { id, properties },
460 })
461 .await
462 }
463}
464
465#[derive(Debug, Clone, Default)]
466struct Communication {
467 purgatory: SentRequests,
468}
469
470impl Communication {
471 async fn init(address: std::net::SocketAddr) -> (Sender, Receiver) {
472 use futures::stream::StreamExt;
473 use tokio_util::codec::Decoder;
474
475 let client = Self::default();
476
477 tracing::info!("connecting to snapcast server at {}", address);
478 let stream = StubbornTcpStream::connect(address).await.unwrap();
479 let (writer, reader) = client.framed(stream).split();
480
481 (writer, reader)
482 }
483}
484
485impl tokio_util::codec::Decoder for Communication {
486 type Item = Message;
487 type Error = ClientError;
488
489 fn decode(&mut self, src: &mut tokio_util::bytes::BytesMut) -> Result<Option<Self::Item>, Self::Error> {
490 use tokio_util::bytes::Buf;
491
492 if src.is_empty() {
493 return Ok(None);
494 }
495
496 // tracing::trace!("decoding: {:?}", src);
497
498 let lf_pos = src.as_ref().iter().position(|b| *b == b'\n');
499 if let Some(lf_pos) = lf_pos {
500 let data = src.split_to(lf_pos);
501 src.advance(1);
502
503 tracing::debug!("received complete message with length: {}", data.len());
504 let message = std::str::from_utf8(&data).unwrap();
505 tracing::trace!("completed json message: {:?}", message);
506
507 let message = Message::try_from((message, &self.purgatory))?;
508 tracing::trace!("completed deserialized message: {:?}", message);
509
510 return Ok(Some(message));
511 }
512
513 Ok(None)
514 }
515}
516
517impl tokio_util::codec::Encoder<Method> for Communication {
518 type Error = ClientError;
519
520 fn encode(&mut self, method: Method, dst: &mut tokio_util::bytes::BytesMut) -> Result<(), Self::Error> {
521 tracing::trace!("encoding: {:?}", method);
522
523 let id = Uuid::new_v4();
524 let command: RequestMethod = (&method).into();
525 tracing::debug!("sending command: {:?}", command);
526 self.purgatory.insert(id, command);
527
528 let data = Request {
529 id,
530 jsonrpc: "2.0".to_string(),
531 method,
532 };
533
534 let string: String = data.try_into()?;
535 let string = format!("{}\n", string);
536 tracing::trace!("sending: {:?}", string);
537
538 dst.extend_from_slice(string.as_bytes());
539
540 Ok(())
541 }
542}
543
544/// Error type for the Snapcast client
545#[derive(Debug, thiserror::Error)]
546pub enum ClientError {
547 /// An error returned by the Snapcast server
548 #[error("Snapcast error: {0}")]
549 Snapcast(#[from] errors::SnapcastError),
550 /// An error communicating with the Snapcast server
551 #[error("Communication error: {0}")]
552 Io(#[from] std::io::Error),
553 /// An error deserializing a message from the Snapcast server
554 #[error("Deserialization error: {0}")]
555 Deserialization(#[from] protocol::DeserializationError),
556 /// An error deserializing the json from the Snapcast server
557 #[error("JSON Deserialization error: {0}")]
558 JsonDeserialization(#[from] serde_json::Error),
559 /// An unknown error
560 #[error("Unknown error: {0}")]
561 Unknown(String),
562}