snapcast_control/
communication.rs

1use stubborn_io::StubbornTcpStream;
2use uuid::Uuid;
3
4use crate::{
5  errors,
6  protocol::{self, client, group, server, stream, Request, RequestMethod, SentRequests, SnapcastDeserializer},
7  state::WrappedState,
8  Message, Method, ValidMessage,
9};
10
11type Sender =
12  futures::stream::SplitSink<tokio_util::codec::Framed<StubbornTcpStream<std::net::SocketAddr>, Communication>, Method>;
13type Receiver =
14  futures::stream::SplitStream<tokio_util::codec::Framed<StubbornTcpStream<std::net::SocketAddr>, Communication>>;
15
16/// Struct representing a connection to a Snapcast server.
17/// Contains the current state of the server and methods to interact with it.
18///
19/// call `SnapcastConnection::open` to create a new connection.
20pub struct SnapcastConnection {
21  /// The current state of the server. The state is Send + Sync, so it can be shared between threads.
22  pub state: WrappedState,
23
24  // internal
25  sender: Sender,
26  receiver: Receiver,
27}
28
29impl SnapcastConnection {
30  /// open a new connection to a Snapcast server
31  ///
32  /// # args
33  /// `address`: [std::net::SocketAddr] - the address of the Snapcast server
34  ///
35  /// # returns
36  /// a new [SnapcastConnection] struct
37  ///
38  /// # example
39  /// ```no_run
40  /// let mut client = SnapcastConnection::open("127.0.0.1:1705".parse().expect("could not parse socket address")).await.expect("could not connect to server");
41  /// ```
42  pub async fn open(address: std::net::SocketAddr) -> Result<Self, std::io::Error> {
43    let state = WrappedState::default();
44    let (sender, receiver) = Communication::init(address).await?;
45
46    Ok(Self {
47      state,
48      sender,
49      receiver,
50    })
51  }
52
53  /// send a raw command to the Snapcast server
54  ///
55  /// # args
56  /// `command`: [Method] - the command to send
57  ///
58  /// # returns
59  /// an empty [Ok] if the command was sent successfully, or a [ClientError] if there was an error
60  ///
61  /// # example
62  /// ```no_run
63  /// client.send(Method::ServerGetStatus).await.expect("could not send command");
64  /// ```
65  pub async fn send(&mut self, command: Method) -> Result<(), ClientError> {
66    use futures::SinkExt;
67
68    self.sender.send(command).await
69  }
70
71  /// receive messages from the Snapcast server
72  ///
73  /// uses a [futures::stream::Next] under the hood, so: \
74  /// creates a future that resolves to the next batch of messages in the stream
75  ///
76  /// # returns
77  /// an [Option] containing a [Vec] of [Result]s, one for each message in the batch, \
78  /// or [None] if the stream has ended. Transport-level errors result in a single-element
79  /// vec containing the error.
80  ///
81  /// # example
82  /// ```ignore
83  /// if let Some(messages) = client.recv().await {
84  ///   for result in messages {
85  ///     match result {
86  ///       Ok(message) => { /* handle message */ }
87  ///       Err(err) => { /* handle error */ }
88  ///     }
89  ///   }
90  /// }
91  /// ```
92  pub async fn recv(&mut self) -> Option<Vec<Result<ValidMessage, ClientError>>> {
93    use futures::StreamExt;
94
95    let messages = self.receiver.next().await;
96
97    match messages {
98      Some(Ok(messages)) => {
99        let mut results = Vec::with_capacity(messages.len());
100
101        for message in messages {
102          match &message {
103            Message::Error { error, .. } => {
104              results.push(Err(error.clone().into()));
105            }
106            Message::Result { result, .. } => {
107              self.state.handle_result(*result.clone());
108              results.push(Ok(
109                message.try_into().expect("Result can always convert to ValidMessage"),
110              ));
111            }
112            Message::Notification { method, .. } => {
113              self.state.handle_notification(*method.clone());
114              results.push(Ok(
115                message
116                  .try_into()
117                  .expect("Notification can always convert to ValidMessage"),
118              ));
119            }
120          }
121        }
122
123        Some(results)
124      }
125      Some(Err(err)) => Some(vec![Err(err)]),
126      None => None,
127    }
128  }
129
130  // client methods
131  /// request the current status of a client from the Snapcast server
132  ///
133  /// wrapper for sending a [ClientGetStatus](Method::ClientGetStatus) command
134  ///
135  /// # args
136  /// `id`: [String] - the id of the client
137  ///
138  /// # returns
139  /// an empty [Ok] if the command was sent successfully, or a [ClientError] if there was an error
140  ///
141  /// # example
142  /// ```no_run
143  /// client.client_get_status("client_id".to_string()).await.expect("could not get client status");
144  /// ```
145  pub async fn client_get_status(&mut self, id: String) -> Result<(), ClientError> {
146    self
147      .send(Method::ClientGetStatus {
148        params: client::GetStatusParams { id },
149      })
150      .await
151  }
152
153  /// set the volume and mute status of a client
154  ///
155  /// wrapper for sending a [ClientSetVolume](Method::ClientSetVolume) command
156  ///
157  /// # args
158  /// `id`: [String] - the id of the client
159  /// `volume`: [client::ClientVolume] - the volume and mute status to set
160  ///
161  /// # returns
162  /// an empty [Ok] if the command was sent successfully, or a [ClientError] if there was an error
163  ///
164  /// # example
165  /// ```no_run
166  /// client.client_set_mute("client_id".to_string(), client::ClientVolume { mute: false, volume: 50 }).await.expect("could not set client mute");
167  /// ```
168  pub async fn client_set_volume(&mut self, id: String, volume: client::ClientVolume) -> Result<(), ClientError> {
169    self
170      .send(Method::ClientSetVolume {
171        params: client::SetVolumeParams { id, volume },
172      })
173      .await
174  }
175
176  /// set the latency of a client
177  ///
178  /// wrapper for sending a [ClientSetLatency](Method::ClientSetLatency) command
179  ///
180  /// # args
181  /// `id`: [String] - the id of the client
182  /// `latency`: [usize] - the latency to set
183  ///
184  /// # returns
185  /// an empty [Ok] if the command was sent successfully, or a [ClientError] if there was an error
186  ///
187  /// # example
188  /// ```no_run
189  /// client.client_set_latency("client_id".to_string(), 100).await.expect("could not set client latency");
190  /// ```
191  pub async fn client_set_latency(&mut self, id: String, latency: usize) -> Result<(), ClientError> {
192    self
193      .send(Method::ClientSetLatency {
194        params: client::SetLatencyParams { id, latency },
195      })
196      .await
197  }
198
199  /// set the name of a client
200  ///
201  /// wrapper for sending a [ClientSetName](Method::ClientSetName) command
202  ///
203  /// # args
204  /// `id`: [String] - the id of the client
205  /// `name`: [String] - the name to set
206  ///
207  /// # returns
208  /// an empty [Ok] if the command was sent successfully, or a [ClientError] if there was an error
209  ///
210  /// # example
211  /// ```no_run
212  /// client.client_set_name("client_id".to_string(), "new_name".to_string()).await.expect("could not set client name");
213  /// ```
214  pub async fn client_set_name(&mut self, id: String, name: String) -> Result<(), ClientError> {
215    self
216      .send(Method::ClientSetName {
217        params: client::SetNameParams { id, name },
218      })
219      .await
220  }
221
222  // group methods
223  /// request the current status of a group from the Snapcast server
224  ///
225  /// wrapper for sending a [GroupGetStatus](Method::GroupGetStatus) command
226  ///
227  /// # args
228  /// `id`: [String] - the id of the group
229  ///
230  /// # returns
231  /// an empty [Ok] if the command was sent successfully, or a [ClientError] if there was an error
232  ///
233  /// # example
234  /// ```no_run
235  /// client.group_get_status("group_id".to_string()).await.expect("could not get group status");
236  /// ```
237  pub async fn group_get_status(&mut self, id: String) -> Result<(), ClientError> {
238    self
239      .send(Method::GroupGetStatus {
240        params: group::GetStatusParams { id },
241      })
242      .await
243  }
244
245  /// set the mute status of a group
246  ///
247  /// wrapper for sending a [GroupSetMute](Method::GroupSetMute) command
248  ///
249  /// # args
250  /// `id`: [String] - the id of the group
251  /// `mute`: [bool] - the mute status to set
252  ///
253  /// # returns
254  /// an empty [Ok] if the command was sent successfully, or a [ClientError] if there was an error
255  ///
256  /// # example
257  /// ```no_run
258  /// client.group_set_mute("group_id".to_string(), true).await.expect("could not set group mute");
259  /// ```
260  pub async fn group_set_mute(&mut self, id: String, mute: bool) -> Result<(), ClientError> {
261    self
262      .send(Method::GroupSetMute {
263        params: group::SetMuteParams { id, mute },
264      })
265      .await
266  }
267
268  /// set the stream of a group
269  ///
270  /// wrapper for sending a [GroupSetStream](Method::GroupSetStream) command
271  ///
272  /// # args
273  /// `id`: [String] - the id of the group
274  /// `stream_id`: [String] - the id of the stream to set
275  ///
276  /// # returns
277  /// an empty [Ok] if the command was sent successfully, or a [ClientError] if there was an error
278  ///
279  /// # example
280  /// ```no_run
281  /// client.group_set_stream("group_id".to_string(), "stream_id".to_string()).await.expect("could not set group stream");
282  /// ```
283  pub async fn group_set_stream(&mut self, id: String, stream_id: String) -> Result<(), ClientError> {
284    self
285      .send(Method::GroupSetStream {
286        params: group::SetStreamParams { id, stream_id },
287      })
288      .await
289  }
290
291  /// set the clients of a group
292  ///
293  /// wrapper for sending a [GroupSetClients](Method::GroupSetClients) command
294  ///
295  /// # args
296  /// `id`: [String] - the id of the group
297  /// `clients`: [Vec]<[String]> - the ids of the clients to set
298  ///
299  /// # returns
300  /// an empty [Ok] if the command was sent successfully, or a [ClientError] if there was an error
301  ///
302  /// # example
303  /// ```no_run
304  /// client.group_set_clients("group_id".to_string(), vec!["client_id".to_string()]).await.expect("could not set group clients");
305  /// ```
306  pub async fn group_set_clients(&mut self, id: String, clients: Vec<String>) -> Result<(), ClientError> {
307    self
308      .send(Method::GroupSetClients {
309        params: group::SetClientsParams { id, clients },
310      })
311      .await
312  }
313
314  /// set the name of a group
315  ///
316  /// wrapper for sending a [GroupSetName](Method::GroupSetName) command
317  ///
318  /// # args
319  /// `id`: [String] - the id of the group
320  /// `name`: [String] - the name to set
321  ///
322  /// # returns
323  /// an empty [Ok] if the command was sent successfully, or a [ClientError] if there was an error
324  ///
325  /// # example
326  /// ```no_run
327  /// client.group_set_name("group_id".to_string(), "new_name".to_string()).await.expect("could not set group name");
328  /// ```
329  pub async fn group_set_name(&mut self, id: String, name: String) -> Result<(), ClientError> {
330    self
331      .send(Method::GroupSetName {
332        params: group::SetNameParams { id, name },
333      })
334      .await
335  }
336
337  // server methods
338  /// request the rpc version of the Snapcast server
339  ///
340  /// wrapper for sending a [ServerGetStatus](Method::ServerGetStatus) command
341  ///
342  /// # returns
343  /// an empty [Ok] if the command was sent successfully, or a [ClientError] if there was an error
344  ///
345  /// # example
346  /// ```no_run
347  /// client.server_get_rpc_version().await.expect("could not get server rpc version");
348  /// ```
349  pub async fn server_get_rpc_version(&mut self) -> Result<(), ClientError> {
350    self.send(Method::ServerGetRPCVersion).await
351  }
352
353  /// request the current status of the Snapcast server, this is a full refresh for state
354  ///
355  /// wrapper for sending a [ServerGetStatus](Method::ServerGetStatus) command
356  ///
357  /// # returns
358  /// an empty [Ok] if the command was sent successfully, or a [ClientError] if there was an error
359  ///
360  /// # example
361  /// ```no_run
362  /// client.server_get_status().await.expect("could not get server status");
363  /// ```
364  pub async fn server_get_status(&mut self) -> Result<(), ClientError> {
365    self.send(Method::ServerGetStatus).await
366  }
367
368  /// forcefully delete a client from the Snapcast server
369  ///
370  /// wrapper for sending a [ServerDeleteClient](Method::ServerDeleteClient) command
371  ///
372  /// # args
373  /// `id`: [String] - the id of the client to delete
374  ///
375  /// # returns
376  /// an empty [Ok] if the command was sent successfully, or a [ClientError] if there was an error
377  ///
378  /// # example
379  /// ```no_run
380  /// client.server_delete_client("client_id".to_string()).await.expect("could not delete client");
381  /// ```
382  pub async fn server_delete_client(&mut self, id: String) -> Result<(), ClientError> {
383    self
384      .send(Method::ServerDeleteClient {
385        params: server::DeleteClientParams { id },
386      })
387      .await
388  }
389
390  // stream methods
391  /// add a new stream to the Snapcast server
392  ///
393  /// wrapper for sending a [StreamAddStream](Method::StreamAddStream) command
394  ///
395  /// # args
396  /// `stream_uri`: [String] - the uri of the stream to add
397  ///
398  /// # returns
399  /// an empty [Ok] if the command was sent successfully, or a [ClientError] if there was an error
400  ///
401  /// # example
402  /// ```no_run
403  /// client.stream_add_stream("librespot:///usr/bin/librespot?name=Spotify&...".to_string()).await.expect("could not add stream");
404  /// ```
405  pub async fn stream_add_stream(&mut self, stream_uri: String) -> Result<(), ClientError> {
406    self
407      .send(Method::StreamAddStream {
408        params: stream::AddStreamParams { stream_uri },
409      })
410      .await
411  }
412
413  /// remove a stream from the Snapcast server
414  ///
415  /// wrapper for sending a [StreamRemoveStream](Method::StreamRemoveStream) command
416  ///
417  /// # args
418  /// `id`: [String] - the id of the stream to remove
419  ///
420  /// # returns
421  /// an empty [Ok] if the command was sent successfully, or a [ClientError] if there was an error
422  ///
423  /// # example
424  /// ```no_run
425  /// client.stream_remove_stream("stream_id".to_string()).await.expect("could not remove stream");
426  /// ```
427  pub async fn stream_remove_stream(&mut self, id: String) -> Result<(), ClientError> {
428    self
429      .send(Method::StreamRemoveStream {
430        params: stream::RemoveStreamParams { id },
431      })
432      .await
433  }
434
435  /// control a stream on the Snapcast server
436  ///
437  /// wrapper for sending a [StreamControl](Method::StreamControl) command
438  ///
439  /// # args
440  /// `id`: [String] - the id of the stream to control
441  /// `command`: [stream::ControlCommand] - the command to send to the stream
442  ///
443  /// # returns
444  /// an empty [Ok] if the command was sent successfully, or a [ClientError] if there was an error
445  ///
446  /// # example
447  /// ```no_run
448  /// client.stream_control("stream_id".to_string(), stream::ControlCommand::Pause).await.expect("could not control stream");
449  /// ```
450  pub async fn stream_control(&mut self, id: String, command: stream::ControlCommand) -> Result<(), ClientError> {
451    self
452      .send(Method::StreamControl {
453        params: stream::ControlParams { id, command },
454      })
455      .await
456  }
457
458  /// set the property of a stream on the Snapcast server
459  ///
460  /// wrapper for sending a [StreamSetProperty](Method::StreamSetProperty) command
461  ///
462  /// # args
463  /// `id`: [String] - the id of the stream to control
464  /// `properties`: [stream::SetPropertyProperties] - the properties to set on the stream
465  ///
466  /// # returns
467  /// an empty [Ok] if the command was sent successfully, or a [ClientError] if there was an error
468  ///
469  /// # example
470  /// ```no_run
471  /// client.stream_set_property("stream_id".to_string(), stream::SetPropertyProperties::Shuffle(true)).await.expect("could not set stream property");
472  /// ```
473  pub async fn stream_set_property(
474    &mut self,
475    id: String,
476    properties: stream::SetPropertyProperties,
477  ) -> Result<(), ClientError> {
478    self
479      .send(Method::StreamSetProperty {
480        params: stream::SetPropertyParams { id, properties },
481      })
482      .await
483  }
484}
485
486#[derive(Debug, Clone, Default)]
487struct Communication {
488  purgatory: SentRequests,
489}
490
491impl Communication {
492  async fn init(address: std::net::SocketAddr) -> Result<(Sender, Receiver), std::io::Error> {
493    use futures::stream::StreamExt;
494    use tokio_util::codec::Decoder;
495
496    let client = Self::default();
497
498    tracing::info!("connecting to snapcast server at {}", address);
499    let stream = StubbornTcpStream::connect(address).await?;
500    let (writer, reader) = client.framed(stream).split();
501
502    Ok((writer, reader))
503  }
504}
505
506impl tokio_util::codec::Decoder for Communication {
507  type Item = Vec<Message>;
508  type Error = ClientError;
509
510  fn decode(&mut self, src: &mut tokio_util::bytes::BytesMut) -> Result<Option<Self::Item>, Self::Error> {
511    use tokio_util::bytes::Buf;
512
513    if src.is_empty() {
514      return Ok(None);
515    }
516
517    let lf_pos = src.as_ref().iter().position(|b| *b == b'\n');
518    if let Some(lf_pos) = lf_pos {
519      let data = src.split_to(lf_pos);
520      src.advance(1);
521
522      tracing::debug!("received complete message with length: {}", data.len());
523      let message = std::str::from_utf8(&data)?;
524      tracing::trace!("completed json message: {:?}", message);
525
526      let messages = SnapcastDeserializer::de(message, &self.purgatory)?;
527      tracing::trace!("completed deserialized messages: {:?}", messages);
528
529      if messages.is_empty() {
530        return Ok(None);
531      }
532
533      return Ok(Some(messages));
534    }
535
536    Ok(None)
537  }
538}
539
540impl tokio_util::codec::Encoder<Method> for Communication {
541  type Error = ClientError;
542
543  fn encode(&mut self, method: Method, dst: &mut tokio_util::bytes::BytesMut) -> Result<(), Self::Error> {
544    tracing::trace!("encoding: {:?}", method);
545
546    let id = Uuid::new_v4();
547    let command: RequestMethod = (&method).into();
548    tracing::debug!("sending command: {:?}", command);
549    self.purgatory.insert(id, command);
550
551    let data = Request {
552      id,
553      jsonrpc: "2.0".to_string(),
554      method,
555    };
556
557    let string: String = data.try_into()?;
558    let string = format!("{}\n", string);
559    tracing::trace!("sending: {:?}", string);
560
561    dst.extend_from_slice(string.as_bytes());
562
563    Ok(())
564  }
565}
566
567/// Error type for the Snapcast client
568#[derive(Debug, thiserror::Error)]
569pub enum ClientError {
570  /// An error returned by the Snapcast server
571  #[error("Snapcast error: {0}")]
572  Snapcast(#[from] errors::SnapcastError),
573  /// An error communicating with the Snapcast server
574  #[error("Communication error: {0}")]
575  Io(#[from] std::io::Error),
576  /// An error decoding a UTF-8 string from the Snapcast server
577  #[error("UTF-8 decoding error: {0}")]
578  Utf8(#[from] std::str::Utf8Error),
579  /// An error deserializing a message from the Snapcast server
580  #[error("Deserialization error: {0}")]
581  Deserialization(#[from] protocol::DeserializationError),
582  /// An error deserializing the json from the Snapcast server
583  #[error("JSON Deserialization error: {0}")]
584  JsonDeserialization(#[from] serde_json::Error),
585  /// An unknown error
586  #[error("Unknown error: {0}")]
587  Unknown(String),
588}