snapcast_control/communication.rs
1use stubborn_io::StubbornTcpStream;
2use uuid::Uuid;
3
4use crate::{
5 errors,
6 protocol::{self, client, group, server, stream, Request, RequestMethod, SentRequests, SnapcastDeserializer},
7 state::WrappedState,
8 Message, Method, ValidMessage,
9};
10
11type Sender =
12 futures::stream::SplitSink<tokio_util::codec::Framed<StubbornTcpStream<std::net::SocketAddr>, Communication>, Method>;
13type Receiver =
14 futures::stream::SplitStream<tokio_util::codec::Framed<StubbornTcpStream<std::net::SocketAddr>, Communication>>;
15
16/// Struct representing a connection to a Snapcast server.
17/// Contains the current state of the server and methods to interact with it.
18///
19/// call `SnapcastConnection::open` to create a new connection.
20pub struct SnapcastConnection {
21 /// The current state of the server. The state is Send + Sync, so it can be shared between threads.
22 pub state: WrappedState,
23
24 // internal
25 sender: Sender,
26 receiver: Receiver,
27}
28
29impl SnapcastConnection {
30 /// open a new connection to a Snapcast server
31 ///
32 /// # args
33 /// `address`: [std::net::SocketAddr] - the address of the Snapcast server
34 ///
35 /// # returns
36 /// a new [SnapcastConnection] struct
37 ///
38 /// # example
39 /// ```no_run
40 /// let mut client = SnapcastConnection::open("127.0.0.1:1705".parse().expect("could not parse socket address")).await.expect("could not connect to server");
41 /// ```
42 pub async fn open(address: std::net::SocketAddr) -> Result<Self, std::io::Error> {
43 let state = WrappedState::default();
44 let (sender, receiver) = Communication::init(address).await?;
45
46 Ok(Self {
47 state,
48 sender,
49 receiver,
50 })
51 }
52
53 /// send a raw command to the Snapcast server
54 ///
55 /// # args
56 /// `command`: [Method] - the command to send
57 ///
58 /// # returns
59 /// an empty [Ok] if the command was sent successfully, or a [ClientError] if there was an error
60 ///
61 /// # example
62 /// ```no_run
63 /// client.send(Method::ServerGetStatus).await.expect("could not send command");
64 /// ```
65 pub async fn send(&mut self, command: Method) -> Result<(), ClientError> {
66 use futures::SinkExt;
67
68 self.sender.send(command).await
69 }
70
71 /// receive messages from the Snapcast server
72 ///
73 /// uses a [futures::stream::Next] under the hood, so: \
74 /// creates a future that resolves to the next batch of messages in the stream
75 ///
76 /// # returns
77 /// an [Option] containing a [Vec] of [Result]s, one for each message in the batch, \
78 /// or [None] if the stream has ended. Transport-level errors result in a single-element
79 /// vec containing the error.
80 ///
81 /// # example
82 /// ```ignore
83 /// if let Some(messages) = client.recv().await {
84 /// for result in messages {
85 /// match result {
86 /// Ok(message) => { /* handle message */ }
87 /// Err(err) => { /* handle error */ }
88 /// }
89 /// }
90 /// }
91 /// ```
92 pub async fn recv(&mut self) -> Option<Vec<Result<ValidMessage, ClientError>>> {
93 use futures::StreamExt;
94
95 let messages = self.receiver.next().await;
96
97 match messages {
98 Some(Ok(messages)) => {
99 let mut results = Vec::with_capacity(messages.len());
100
101 for message in messages {
102 match &message {
103 Message::Error { error, .. } => {
104 results.push(Err(error.clone().into()));
105 }
106 Message::Result { result, .. } => {
107 self.state.handle_result(*result.clone());
108 results.push(Ok(
109 message.try_into().expect("Result can always convert to ValidMessage"),
110 ));
111 }
112 Message::Notification { method, .. } => {
113 self.state.handle_notification(*method.clone());
114 results.push(Ok(
115 message
116 .try_into()
117 .expect("Notification can always convert to ValidMessage"),
118 ));
119 }
120 }
121 }
122
123 Some(results)
124 }
125 Some(Err(err)) => Some(vec![Err(err)]),
126 None => None,
127 }
128 }
129
130 // client methods
131 /// request the current status of a client from the Snapcast server
132 ///
133 /// wrapper for sending a [ClientGetStatus](Method::ClientGetStatus) command
134 ///
135 /// # args
136 /// `id`: [String] - the id of the client
137 ///
138 /// # returns
139 /// an empty [Ok] if the command was sent successfully, or a [ClientError] if there was an error
140 ///
141 /// # example
142 /// ```no_run
143 /// client.client_get_status("client_id".to_string()).await.expect("could not get client status");
144 /// ```
145 pub async fn client_get_status(&mut self, id: String) -> Result<(), ClientError> {
146 self
147 .send(Method::ClientGetStatus {
148 params: client::GetStatusParams { id },
149 })
150 .await
151 }
152
153 /// set the volume and mute status of a client
154 ///
155 /// wrapper for sending a [ClientSetVolume](Method::ClientSetVolume) command
156 ///
157 /// # args
158 /// `id`: [String] - the id of the client
159 /// `volume`: [client::ClientVolume] - the volume and mute status to set
160 ///
161 /// # returns
162 /// an empty [Ok] if the command was sent successfully, or a [ClientError] if there was an error
163 ///
164 /// # example
165 /// ```no_run
166 /// client.client_set_mute("client_id".to_string(), client::ClientVolume { mute: false, volume: 50 }).await.expect("could not set client mute");
167 /// ```
168 pub async fn client_set_volume(&mut self, id: String, volume: client::ClientVolume) -> Result<(), ClientError> {
169 self
170 .send(Method::ClientSetVolume {
171 params: client::SetVolumeParams { id, volume },
172 })
173 .await
174 }
175
176 /// set the latency of a client
177 ///
178 /// wrapper for sending a [ClientSetLatency](Method::ClientSetLatency) command
179 ///
180 /// # args
181 /// `id`: [String] - the id of the client
182 /// `latency`: [usize] - the latency to set
183 ///
184 /// # returns
185 /// an empty [Ok] if the command was sent successfully, or a [ClientError] if there was an error
186 ///
187 /// # example
188 /// ```no_run
189 /// client.client_set_latency("client_id".to_string(), 100).await.expect("could not set client latency");
190 /// ```
191 pub async fn client_set_latency(&mut self, id: String, latency: usize) -> Result<(), ClientError> {
192 self
193 .send(Method::ClientSetLatency {
194 params: client::SetLatencyParams { id, latency },
195 })
196 .await
197 }
198
199 /// set the name of a client
200 ///
201 /// wrapper for sending a [ClientSetName](Method::ClientSetName) command
202 ///
203 /// # args
204 /// `id`: [String] - the id of the client
205 /// `name`: [String] - the name to set
206 ///
207 /// # returns
208 /// an empty [Ok] if the command was sent successfully, or a [ClientError] if there was an error
209 ///
210 /// # example
211 /// ```no_run
212 /// client.client_set_name("client_id".to_string(), "new_name".to_string()).await.expect("could not set client name");
213 /// ```
214 pub async fn client_set_name(&mut self, id: String, name: String) -> Result<(), ClientError> {
215 self
216 .send(Method::ClientSetName {
217 params: client::SetNameParams { id, name },
218 })
219 .await
220 }
221
222 // group methods
223 /// request the current status of a group from the Snapcast server
224 ///
225 /// wrapper for sending a [GroupGetStatus](Method::GroupGetStatus) command
226 ///
227 /// # args
228 /// `id`: [String] - the id of the group
229 ///
230 /// # returns
231 /// an empty [Ok] if the command was sent successfully, or a [ClientError] if there was an error
232 ///
233 /// # example
234 /// ```no_run
235 /// client.group_get_status("group_id".to_string()).await.expect("could not get group status");
236 /// ```
237 pub async fn group_get_status(&mut self, id: String) -> Result<(), ClientError> {
238 self
239 .send(Method::GroupGetStatus {
240 params: group::GetStatusParams { id },
241 })
242 .await
243 }
244
245 /// set the mute status of a group
246 ///
247 /// wrapper for sending a [GroupSetMute](Method::GroupSetMute) command
248 ///
249 /// # args
250 /// `id`: [String] - the id of the group
251 /// `mute`: [bool] - the mute status to set
252 ///
253 /// # returns
254 /// an empty [Ok] if the command was sent successfully, or a [ClientError] if there was an error
255 ///
256 /// # example
257 /// ```no_run
258 /// client.group_set_mute("group_id".to_string(), true).await.expect("could not set group mute");
259 /// ```
260 pub async fn group_set_mute(&mut self, id: String, mute: bool) -> Result<(), ClientError> {
261 self
262 .send(Method::GroupSetMute {
263 params: group::SetMuteParams { id, mute },
264 })
265 .await
266 }
267
268 /// set the stream of a group
269 ///
270 /// wrapper for sending a [GroupSetStream](Method::GroupSetStream) command
271 ///
272 /// # args
273 /// `id`: [String] - the id of the group
274 /// `stream_id`: [String] - the id of the stream to set
275 ///
276 /// # returns
277 /// an empty [Ok] if the command was sent successfully, or a [ClientError] if there was an error
278 ///
279 /// # example
280 /// ```no_run
281 /// client.group_set_stream("group_id".to_string(), "stream_id".to_string()).await.expect("could not set group stream");
282 /// ```
283 pub async fn group_set_stream(&mut self, id: String, stream_id: String) -> Result<(), ClientError> {
284 self
285 .send(Method::GroupSetStream {
286 params: group::SetStreamParams { id, stream_id },
287 })
288 .await
289 }
290
291 /// set the clients of a group
292 ///
293 /// wrapper for sending a [GroupSetClients](Method::GroupSetClients) command
294 ///
295 /// # args
296 /// `id`: [String] - the id of the group
297 /// `clients`: [Vec]<[String]> - the ids of the clients to set
298 ///
299 /// # returns
300 /// an empty [Ok] if the command was sent successfully, or a [ClientError] if there was an error
301 ///
302 /// # example
303 /// ```no_run
304 /// client.group_set_clients("group_id".to_string(), vec!["client_id".to_string()]).await.expect("could not set group clients");
305 /// ```
306 pub async fn group_set_clients(&mut self, id: String, clients: Vec<String>) -> Result<(), ClientError> {
307 self
308 .send(Method::GroupSetClients {
309 params: group::SetClientsParams { id, clients },
310 })
311 .await
312 }
313
314 /// set the name of a group
315 ///
316 /// wrapper for sending a [GroupSetName](Method::GroupSetName) command
317 ///
318 /// # args
319 /// `id`: [String] - the id of the group
320 /// `name`: [String] - the name to set
321 ///
322 /// # returns
323 /// an empty [Ok] if the command was sent successfully, or a [ClientError] if there was an error
324 ///
325 /// # example
326 /// ```no_run
327 /// client.group_set_name("group_id".to_string(), "new_name".to_string()).await.expect("could not set group name");
328 /// ```
329 pub async fn group_set_name(&mut self, id: String, name: String) -> Result<(), ClientError> {
330 self
331 .send(Method::GroupSetName {
332 params: group::SetNameParams { id, name },
333 })
334 .await
335 }
336
337 // server methods
338 /// request the rpc version of the Snapcast server
339 ///
340 /// wrapper for sending a [ServerGetStatus](Method::ServerGetStatus) command
341 ///
342 /// # returns
343 /// an empty [Ok] if the command was sent successfully, or a [ClientError] if there was an error
344 ///
345 /// # example
346 /// ```no_run
347 /// client.server_get_rpc_version().await.expect("could not get server rpc version");
348 /// ```
349 pub async fn server_get_rpc_version(&mut self) -> Result<(), ClientError> {
350 self.send(Method::ServerGetRPCVersion).await
351 }
352
353 /// request the current status of the Snapcast server, this is a full refresh for state
354 ///
355 /// wrapper for sending a [ServerGetStatus](Method::ServerGetStatus) command
356 ///
357 /// # returns
358 /// an empty [Ok] if the command was sent successfully, or a [ClientError] if there was an error
359 ///
360 /// # example
361 /// ```no_run
362 /// client.server_get_status().await.expect("could not get server status");
363 /// ```
364 pub async fn server_get_status(&mut self) -> Result<(), ClientError> {
365 self.send(Method::ServerGetStatus).await
366 }
367
368 /// forcefully delete a client from the Snapcast server
369 ///
370 /// wrapper for sending a [ServerDeleteClient](Method::ServerDeleteClient) command
371 ///
372 /// # args
373 /// `id`: [String] - the id of the client to delete
374 ///
375 /// # returns
376 /// an empty [Ok] if the command was sent successfully, or a [ClientError] if there was an error
377 ///
378 /// # example
379 /// ```no_run
380 /// client.server_delete_client("client_id".to_string()).await.expect("could not delete client");
381 /// ```
382 pub async fn server_delete_client(&mut self, id: String) -> Result<(), ClientError> {
383 self
384 .send(Method::ServerDeleteClient {
385 params: server::DeleteClientParams { id },
386 })
387 .await
388 }
389
390 // stream methods
391 /// add a new stream to the Snapcast server
392 ///
393 /// wrapper for sending a [StreamAddStream](Method::StreamAddStream) command
394 ///
395 /// # args
396 /// `stream_uri`: [String] - the uri of the stream to add
397 ///
398 /// # returns
399 /// an empty [Ok] if the command was sent successfully, or a [ClientError] if there was an error
400 ///
401 /// # example
402 /// ```no_run
403 /// client.stream_add_stream("librespot:///usr/bin/librespot?name=Spotify&...".to_string()).await.expect("could not add stream");
404 /// ```
405 pub async fn stream_add_stream(&mut self, stream_uri: String) -> Result<(), ClientError> {
406 self
407 .send(Method::StreamAddStream {
408 params: stream::AddStreamParams { stream_uri },
409 })
410 .await
411 }
412
413 /// remove a stream from the Snapcast server
414 ///
415 /// wrapper for sending a [StreamRemoveStream](Method::StreamRemoveStream) command
416 ///
417 /// # args
418 /// `id`: [String] - the id of the stream to remove
419 ///
420 /// # returns
421 /// an empty [Ok] if the command was sent successfully, or a [ClientError] if there was an error
422 ///
423 /// # example
424 /// ```no_run
425 /// client.stream_remove_stream("stream_id".to_string()).await.expect("could not remove stream");
426 /// ```
427 pub async fn stream_remove_stream(&mut self, id: String) -> Result<(), ClientError> {
428 self
429 .send(Method::StreamRemoveStream {
430 params: stream::RemoveStreamParams { id },
431 })
432 .await
433 }
434
435 /// control a stream on the Snapcast server
436 ///
437 /// wrapper for sending a [StreamControl](Method::StreamControl) command
438 ///
439 /// # args
440 /// `id`: [String] - the id of the stream to control
441 /// `command`: [stream::ControlCommand] - the command to send to the stream
442 ///
443 /// # returns
444 /// an empty [Ok] if the command was sent successfully, or a [ClientError] if there was an error
445 ///
446 /// # example
447 /// ```no_run
448 /// client.stream_control("stream_id".to_string(), stream::ControlCommand::Pause).await.expect("could not control stream");
449 /// ```
450 pub async fn stream_control(&mut self, id: String, command: stream::ControlCommand) -> Result<(), ClientError> {
451 self
452 .send(Method::StreamControl {
453 params: stream::ControlParams { id, command },
454 })
455 .await
456 }
457
458 /// set the property of a stream on the Snapcast server
459 ///
460 /// wrapper for sending a [StreamSetProperty](Method::StreamSetProperty) command
461 ///
462 /// # args
463 /// `id`: [String] - the id of the stream to control
464 /// `properties`: [stream::SetPropertyProperties] - the properties to set on the stream
465 ///
466 /// # returns
467 /// an empty [Ok] if the command was sent successfully, or a [ClientError] if there was an error
468 ///
469 /// # example
470 /// ```no_run
471 /// client.stream_set_property("stream_id".to_string(), stream::SetPropertyProperties::Shuffle(true)).await.expect("could not set stream property");
472 /// ```
473 pub async fn stream_set_property(
474 &mut self,
475 id: String,
476 properties: stream::SetPropertyProperties,
477 ) -> Result<(), ClientError> {
478 self
479 .send(Method::StreamSetProperty {
480 params: stream::SetPropertyParams { id, properties },
481 })
482 .await
483 }
484}
485
486#[derive(Debug, Clone, Default)]
487struct Communication {
488 purgatory: SentRequests,
489}
490
491impl Communication {
492 async fn init(address: std::net::SocketAddr) -> Result<(Sender, Receiver), std::io::Error> {
493 use futures::stream::StreamExt;
494 use tokio_util::codec::Decoder;
495
496 let client = Self::default();
497
498 tracing::info!("connecting to snapcast server at {}", address);
499 let stream = StubbornTcpStream::connect(address).await?;
500 let (writer, reader) = client.framed(stream).split();
501
502 Ok((writer, reader))
503 }
504}
505
506impl tokio_util::codec::Decoder for Communication {
507 type Item = Vec<Message>;
508 type Error = ClientError;
509
510 fn decode(&mut self, src: &mut tokio_util::bytes::BytesMut) -> Result<Option<Self::Item>, Self::Error> {
511 use tokio_util::bytes::Buf;
512
513 if src.is_empty() {
514 return Ok(None);
515 }
516
517 let lf_pos = src.as_ref().iter().position(|b| *b == b'\n');
518 if let Some(lf_pos) = lf_pos {
519 let data = src.split_to(lf_pos);
520 src.advance(1);
521
522 tracing::debug!("received complete message with length: {}", data.len());
523 let message = std::str::from_utf8(&data)?;
524 tracing::trace!("completed json message: {:?}", message);
525
526 let messages = SnapcastDeserializer::de(message, &self.purgatory)?;
527 tracing::trace!("completed deserialized messages: {:?}", messages);
528
529 if messages.is_empty() {
530 return Ok(None);
531 }
532
533 return Ok(Some(messages));
534 }
535
536 Ok(None)
537 }
538}
539
540impl tokio_util::codec::Encoder<Method> for Communication {
541 type Error = ClientError;
542
543 fn encode(&mut self, method: Method, dst: &mut tokio_util::bytes::BytesMut) -> Result<(), Self::Error> {
544 tracing::trace!("encoding: {:?}", method);
545
546 let id = Uuid::new_v4();
547 let command: RequestMethod = (&method).into();
548 tracing::debug!("sending command: {:?}", command);
549 self.purgatory.insert(id, command);
550
551 let data = Request {
552 id,
553 jsonrpc: "2.0".to_string(),
554 method,
555 };
556
557 let string: String = data.try_into()?;
558 let string = format!("{}\n", string);
559 tracing::trace!("sending: {:?}", string);
560
561 dst.extend_from_slice(string.as_bytes());
562
563 Ok(())
564 }
565}
566
567/// Error type for the Snapcast client
568#[derive(Debug, thiserror::Error)]
569pub enum ClientError {
570 /// An error returned by the Snapcast server
571 #[error("Snapcast error: {0}")]
572 Snapcast(#[from] errors::SnapcastError),
573 /// An error communicating with the Snapcast server
574 #[error("Communication error: {0}")]
575 Io(#[from] std::io::Error),
576 /// An error decoding a UTF-8 string from the Snapcast server
577 #[error("UTF-8 decoding error: {0}")]
578 Utf8(#[from] std::str::Utf8Error),
579 /// An error deserializing a message from the Snapcast server
580 #[error("Deserialization error: {0}")]
581 Deserialization(#[from] protocol::DeserializationError),
582 /// An error deserializing the json from the Snapcast server
583 #[error("JSON Deserialization error: {0}")]
584 JsonDeserialization(#[from] serde_json::Error),
585 /// An unknown error
586 #[error("Unknown error: {0}")]
587 Unknown(String),
588}