snapcast_control/communication.rs
1use stubborn_io::StubbornTcpStream;
2use uuid::Uuid;
3
4use crate::{
5  errors,
6  protocol::{self, client, group, server, stream, Request, RequestMethod, SentRequests},
7  state::WrappedState,
8  Message, Method, ValidMessage,
9};
10
11type Sender =
12  futures::stream::SplitSink<tokio_util::codec::Framed<StubbornTcpStream<std::net::SocketAddr>, Communication>, Method>;
13type Receiver =
14  futures::stream::SplitStream<tokio_util::codec::Framed<StubbornTcpStream<std::net::SocketAddr>, Communication>>;
15
16/// Struct representing a connection to a Snapcast server.
17/// Contains the current state of the server and methods to interact with it.
18///
19/// call `SnapcastConnection::open` to create a new connection.
20pub struct SnapcastConnection {
21  /// The current state of the server. The state is Send + Sync, so it can be shared between threads.
22  pub state: WrappedState,
23
24  // internal
25  sender: Sender,
26  receiver: Receiver,
27}
28
29impl SnapcastConnection {
30  /// open a new connection to a Snapcast server
31  ///
32  /// # args
33  /// `address`: [std::net::SocketAddr] - the address of the Snapcast server
34  ///
35  /// # returns
36  /// a new [SnapcastConnection] struct
37  ///
38  /// # example
39  /// ```no_run
40  /// let mut client = SnapcastConnection::open("127.0.0.1:1705".parse().expect("could not parse socket address")).await;
41  /// ```
42  pub async fn open(address: std::net::SocketAddr) -> Self {
43    let state = WrappedState::default();
44    let (sender, receiver) = Communication::init(address).await;
45
46    Self {
47      state,
48      sender,
49      receiver,
50    }
51  }
52
53  /// send a raw command to the Snapcast server
54  ///
55  /// # args
56  /// `command`: [Method] - the command to send
57  ///
58  /// # returns
59  /// an empty [Ok] if the command was sent successfully, or a [ClientError] if there was an error
60  ///
61  /// # example
62  /// ```no_run
63  /// client.send(Method::ServerGetStatus).await.expect("could not send command");
64  /// ```
65  pub async fn send(&mut self, command: Method) -> Result<(), ClientError> {
66    use futures::SinkExt;
67
68    self.sender.send(command).await
69  }
70
71  /// receive a message from the Snapcast server
72  ///
73  /// uses a [futures::stream::Next] under the hood, so: \
74  /// creates a future that resolves to the next item in the stream
75  ///
76  /// # returns
77  /// an [Option] containing an [Ok] with a [ValidMessage] if a message was received, \
78  /// an [Option] containing an [Err] with a [ClientError] if there was an error, \
79  /// or [None] if the stream has ended
80  ///
81  /// # example
82  /// ```no_run
83  /// let message = client.recv().await.expect("could not receive message");
84  /// ```
85  pub async fn recv(&mut self) -> Option<Result<ValidMessage, ClientError>> {
86    use futures::StreamExt;
87
88    let message = self.receiver.next().await;
89
90    if let Some(Ok(message)) = message {
91      match &message {
92        Message::Error { error, .. } => return Some(Err(error.clone().into())),
93        Message::Result { result, .. } => self.state.handle_result(*result.clone()),
94        Message::Notification { method, .. } => self.state.handle_notification(*method.clone()),
95      };
96
97      Some(Ok(
98        message
99          .try_into()
100          .expect("this should never fail bc error has returned already"),
101      ))
102    } else if let Some(Err(err)) = message {
103      Some(Err(err))
104    } else {
105      None
106    }
107  }
108
109  // client methods
110  /// request the current status of a client from the Snapcast server
111  ///
112  /// wrapper for sending a [ClientGetStatus](Method::ClientGetStatus) command
113  ///
114  /// # args
115  /// `id`: [String] - the id of the client
116  ///
117  /// # returns
118  /// an empty [Ok] if the command was sent successfully, or a [ClientError] if there was an error
119  ///
120  /// # example
121  /// ```no_run
122  /// client.client_get_status("client_id".to_string()).await.expect("could not get client status");
123  /// ```
124  pub async fn client_get_status(&mut self, id: String) -> Result<(), ClientError> {
125    self
126      .send(Method::ClientGetStatus {
127        params: client::GetStatusParams { id },
128      })
129      .await
130  }
131
132  /// set the volume and mute status of a client
133  ///
134  /// wrapper for sending a [ClientSetVolume](Method::ClientSetVolume) command
135  ///
136  /// # args
137  /// `id`: [String] - the id of the client
138  /// `volume`: [client::ClientVolume] - the volume and mute status to set
139  ///
140  /// # returns
141  /// an empty [Ok] if the command was sent successfully, or a [ClientError] if there was an error
142  ///
143  /// # example
144  /// ```no_run
145  /// client.client_set_mute("client_id".to_string(), client::ClientVolume { mute: false, volume: 50 }).await.expect("could not set client mute");
146  /// ```
147  pub async fn client_set_volume(&mut self, id: String, volume: client::ClientVolume) -> Result<(), ClientError> {
148    self
149      .send(Method::ClientSetVolume {
150        params: client::SetVolumeParams { id, volume },
151      })
152      .await
153  }
154
155  /// set the latency of a client
156  ///
157  /// wrapper for sending a [ClientSetLatency](Method::ClientSetLatency) command
158  ///
159  /// # args
160  /// `id`: [String] - the id of the client
161  /// `latency`: [usize] - the latency to set
162  ///
163  /// # returns
164  /// an empty [Ok] if the command was sent successfully, or a [ClientError] if there was an error
165  ///
166  /// # example
167  /// ```no_run
168  /// client.client_set_latency("client_id".to_string(), 100).await.expect("could not set client latency");
169  /// ```
170  pub async fn client_set_latency(&mut self, id: String, latency: usize) -> Result<(), ClientError> {
171    self
172      .send(Method::ClientSetLatency {
173        params: client::SetLatencyParams { id, latency },
174      })
175      .await
176  }
177
178  /// set the name of a client
179  ///
180  /// wrapper for sending a [ClientSetName](Method::ClientSetName) command
181  ///
182  /// # args
183  /// `id`: [String] - the id of the client
184  /// `name`: [String] - the name to set
185  ///
186  /// # returns
187  /// an empty [Ok] if the command was sent successfully, or a [ClientError] if there was an error
188  ///
189  /// # example
190  /// ```no_run
191  /// client.client_set_name("client_id".to_string(), "new_name".to_string()).await.expect("could not set client name");
192  /// ```
193  pub async fn client_set_name(&mut self, id: String, name: String) -> Result<(), ClientError> {
194    self
195      .send(Method::ClientSetName {
196        params: client::SetNameParams { id, name },
197      })
198      .await
199  }
200
201  // group methods
202  /// request the current status of a group from the Snapcast server
203  ///
204  /// wrapper for sending a [GroupGetStatus](Method::GroupGetStatus) command
205  ///
206  /// # args
207  /// `id`: [String] - the id of the group
208  ///
209  /// # returns
210  /// an empty [Ok] if the command was sent successfully, or a [ClientError] if there was an error
211  ///
212  /// # example
213  /// ```no_run
214  /// client.group_get_status("group_id".to_string()).await.expect("could not get group status");
215  /// ```
216  pub async fn group_get_status(&mut self, id: String) -> Result<(), ClientError> {
217    self
218      .send(Method::GroupGetStatus {
219        params: group::GetStatusParams { id },
220      })
221      .await
222  }
223
224  /// set the mute status of a group
225  ///
226  /// wrapper for sending a [GroupSetMute](Method::GroupSetMute) command
227  ///
228  /// # args
229  /// `id`: [String] - the id of the group
230  /// `mute`: [bool] - the mute status to set
231  ///
232  /// # returns
233  /// an empty [Ok] if the command was sent successfully, or a [ClientError] if there was an error
234  ///
235  /// # example
236  /// ```no_run
237  /// client.group_set_mute("group_id".to_string(), true).await.expect("could not set group mute");
238  /// ```
239  pub async fn group_set_mute(&mut self, id: String, mute: bool) -> Result<(), ClientError> {
240    self
241      .send(Method::GroupSetMute {
242        params: group::SetMuteParams { id, mute },
243      })
244      .await
245  }
246
247  /// set the stream of a group
248  ///
249  /// wrapper for sending a [GroupSetStream](Method::GroupSetStream) command
250  ///
251  /// # args
252  /// `id`: [String] - the id of the group
253  /// `stream_id`: [String] - the id of the stream to set
254  ///
255  /// # returns
256  /// an empty [Ok] if the command was sent successfully, or a [ClientError] if there was an error
257  ///
258  /// # example
259  /// ```no_run
260  /// client.group_set_stream("group_id".to_string(), "stream_id".to_string()).await.expect("could not set group stream");
261  /// ```
262  pub async fn group_set_stream(&mut self, id: String, stream_id: String) -> Result<(), ClientError> {
263    self
264      .send(Method::GroupSetStream {
265        params: group::SetStreamParams { id, stream_id },
266      })
267      .await
268  }
269
270  /// set the clients of a group
271  ///
272  /// wrapper for sending a [GroupSetClients](Method::GroupSetClients) command
273  ///
274  /// # args
275  /// `id`: [String] - the id of the group
276  /// `clients`: [Vec]<[String]> - the ids of the clients to set
277  ///
278  /// # returns
279  /// an empty [Ok] if the command was sent successfully, or a [ClientError] if there was an error
280  ///
281  /// # example
282  /// ```no_run
283  /// client.group_set_clients("group_id".to_string(), vec!["client_id".to_string()]).await.expect("could not set group clients");
284  /// ```
285  pub async fn group_set_clients(&mut self, id: String, clients: Vec<String>) -> Result<(), ClientError> {
286    self
287      .send(Method::GroupSetClients {
288        params: group::SetClientsParams { id, clients },
289      })
290      .await
291  }
292
293  /// set the name of a group
294  ///
295  /// wrapper for sending a [GroupSetName](Method::GroupSetName) command
296  ///
297  /// # args
298  /// `id`: [String] - the id of the group
299  /// `name`: [String] - the name to set
300  ///
301  /// # returns
302  /// an empty [Ok] if the command was sent successfully, or a [ClientError] if there was an error
303  ///
304  /// # example
305  /// ```no_run
306  /// client.group_set_name("group_id".to_string(), "new_name".to_string()).await.expect("could not set group name");
307  /// ```
308  pub async fn group_set_name(&mut self, id: String, name: String) -> Result<(), ClientError> {
309    self
310      .send(Method::GroupSetName {
311        params: group::SetNameParams { id, name },
312      })
313      .await
314  }
315
316  // server methods
317  /// request the rpc version of the Snapcast server
318  ///
319  /// wrapper for sending a [ServerGetStatus](Method::ServerGetStatus) command
320  ///
321  /// # returns
322  /// an empty [Ok] if the command was sent successfully, or a [ClientError] if there was an error
323  ///
324  /// # example
325  /// ```no_run
326  /// client.server_get_rpc_version().await.expect("could not get server rpc version");
327  /// ```
328  pub async fn server_get_rpc_version(&mut self) -> Result<(), ClientError> {
329    self.send(Method::ServerGetRPCVersion).await
330  }
331
332  /// request the current status of the Snapcast server, this is a full refresh for state
333  ///
334  /// wrapper for sending a [ServerGetStatus](Method::ServerGetStatus) command
335  ///
336  /// # returns
337  /// an empty [Ok] if the command was sent successfully, or a [ClientError] if there was an error
338  ///
339  /// # example
340  /// ```no_run
341  /// client.server_get_status().await.expect("could not get server status");
342  /// ```
343  pub async fn server_get_status(&mut self) -> Result<(), ClientError> {
344    self.send(Method::ServerGetStatus).await
345  }
346
347  /// forcefully delete a client from the Snapcast server
348  ///
349  /// wrapper for sending a [ServerDeleteClient](Method::ServerDeleteClient) command
350  ///
351  /// # args
352  /// `id`: [String] - the id of the client to delete
353  ///
354  /// # returns
355  /// an empty [Ok] if the command was sent successfully, or a [ClientError] if there was an error
356  ///
357  /// # example
358  /// ```no_run
359  /// client.server_delete_client("client_id".to_string()).await.expect("could not delete client");
360  /// ```
361  pub async fn server_delete_client(&mut self, id: String) -> Result<(), ClientError> {
362    self
363      .send(Method::ServerDeleteClient {
364        params: server::DeleteClientParams { id },
365      })
366      .await
367  }
368
369  // stream methods
370  /// add a new stream to the Snapcast server
371  ///
372  /// wrapper for sending a [StreamAddStream](Method::StreamAddStream) command
373  ///
374  /// # args
375  /// `stream_uri`: [String] - the uri of the stream to add
376  ///
377  /// # returns
378  /// an empty [Ok] if the command was sent successfully, or a [ClientError] if there was an error
379  ///
380  /// # example
381  /// ```no_run
382  /// client.stream_add_stream("librespot:///usr/bin/librespot?name=Spotify&...".to_string()).await.expect("could not add stream");
383  /// ```
384  pub async fn stream_add_stream(&mut self, stream_uri: String) -> Result<(), ClientError> {
385    self
386      .send(Method::StreamAddStream {
387        params: stream::AddStreamParams { stream_uri },
388      })
389      .await
390  }
391
392  /// remove a stream from the Snapcast server
393  ///
394  /// wrapper for sending a [StreamRemoveStream](Method::StreamRemoveStream) command
395  ///
396  /// # args
397  /// `id`: [String] - the id of the stream to remove
398  ///
399  /// # returns
400  /// an empty [Ok] if the command was sent successfully, or a [ClientError] if there was an error
401  ///
402  /// # example
403  /// ```no_run
404  /// client.stream_remove_stream("stream_id".to_string()).await.expect("could not remove stream");
405  /// ```
406  pub async fn stream_remove_stream(&mut self, id: String) -> Result<(), ClientError> {
407    self
408      .send(Method::StreamRemoveStream {
409        params: stream::RemoveStreamParams { id },
410      })
411      .await
412  }
413
414  /// control a stream on the Snapcast server
415  ///
416  /// wrapper for sending a [StreamControl](Method::StreamControl) command
417  ///
418  /// # args
419  /// `id`: [String] - the id of the stream to control
420  /// `command`: [stream::ControlCommand] - the command to send to the stream
421  ///
422  /// # returns
423  /// an empty [Ok] if the command was sent successfully, or a [ClientError] if there was an error
424  ///
425  /// # example
426  /// ```no_run
427  /// client.stream_control("stream_id".to_string(), stream::ControlCommand::Pause).await.expect("could not control stream");
428  /// ```
429  pub async fn stream_control(&mut self, id: String, command: stream::ControlCommand) -> Result<(), ClientError> {
430    self
431      .send(Method::StreamControl {
432        params: stream::ControlParams { id, command },
433      })
434      .await
435  }
436
437  /// set the property of a stream on the Snapcast server
438  ///
439  /// wrapper for sending a [StreamSetProperty](Method::StreamSetProperty) command
440  ///
441  /// # args
442  /// `id`: [String] - the id of the stream to control
443  /// `properties`: [stream::SetPropertyProperties] - the properties to set on the stream
444  ///
445  /// # returns
446  /// an empty [Ok] if the command was sent successfully, or a [ClientError] if there was an error
447  ///
448  /// # example
449  /// ```no_run
450  /// client.stream_set_property("stream_id".to_string(), stream::SetPropertyProperties::Shuffle(true)).await.expect("could not set stream property");
451  /// ```
452  pub async fn stream_set_property(
453    &mut self,
454    id: String,
455    properties: stream::SetPropertyProperties,
456  ) -> Result<(), ClientError> {
457    self
458      .send(Method::StreamSetProperty {
459        params: stream::SetPropertyParams { id, properties },
460      })
461      .await
462  }
463}
464
465#[derive(Debug, Clone, Default)]
466struct Communication {
467  purgatory: SentRequests,
468}
469
470impl Communication {
471  async fn init(address: std::net::SocketAddr) -> (Sender, Receiver) {
472    use futures::stream::StreamExt;
473    use tokio_util::codec::Decoder;
474
475    let client = Self::default();
476
477    tracing::info!("connecting to snapcast server at {}", address);
478    let stream = StubbornTcpStream::connect(address).await.unwrap();
479    let (writer, reader) = client.framed(stream).split();
480
481    (writer, reader)
482  }
483}
484
485impl tokio_util::codec::Decoder for Communication {
486  type Item = Message;
487  type Error = ClientError;
488
489  fn decode(&mut self, src: &mut tokio_util::bytes::BytesMut) -> Result<Option<Self::Item>, Self::Error> {
490    use tokio_util::bytes::Buf;
491
492    if src.is_empty() {
493      return Ok(None);
494    }
495
496    // tracing::trace!("decoding: {:?}", src);
497
498    let lf_pos = src.as_ref().iter().position(|b| *b == b'\n');
499    if let Some(lf_pos) = lf_pos {
500      let data = src.split_to(lf_pos);
501      src.advance(1);
502
503      tracing::debug!("received complete message with length: {}", data.len());
504      let message = std::str::from_utf8(&data).unwrap();
505      tracing::trace!("completed json message: {:?}", message);
506
507      let message = Message::try_from((message, &self.purgatory))?;
508      tracing::trace!("completed deserialized message: {:?}", message);
509
510      return Ok(Some(message));
511    }
512
513    Ok(None)
514  }
515}
516
517impl tokio_util::codec::Encoder<Method> for Communication {
518  type Error = ClientError;
519
520  fn encode(&mut self, method: Method, dst: &mut tokio_util::bytes::BytesMut) -> Result<(), Self::Error> {
521    tracing::trace!("encoding: {:?}", method);
522
523    let id = Uuid::new_v4();
524    let command: RequestMethod = (&method).into();
525    tracing::debug!("sending command: {:?}", command);
526    self.purgatory.insert(id, command);
527
528    let data = Request {
529      id,
530      jsonrpc: "2.0".to_string(),
531      method,
532    };
533
534    let string: String = data.try_into()?;
535    let string = format!("{}\n", string);
536    tracing::trace!("sending: {:?}", string);
537
538    dst.extend_from_slice(string.as_bytes());
539
540    Ok(())
541  }
542}
543
544/// Error type for the Snapcast client
545#[derive(Debug, thiserror::Error)]
546pub enum ClientError {
547  /// An error returned by the Snapcast server
548  #[error("Snapcast error: {0}")]
549  Snapcast(#[from] errors::SnapcastError),
550  /// An error communicating with the Snapcast server
551  #[error("Communication error: {0}")]
552  Io(#[from] std::io::Error),
553  /// An error deserializing a message from the Snapcast server
554  #[error("Deserialization error: {0}")]
555  Deserialization(#[from] protocol::DeserializationError),
556  /// An error deserializing the json from the Snapcast server
557  #[error("JSON Deserialization error: {0}")]
558  JsonDeserialization(#[from] serde_json::Error),
559  /// An unknown error
560  #[error("Unknown error: {0}")]
561  Unknown(String),
562}