snapcast_control/
state.rs

1use dashmap::{mapref::entry::Entry, DashMap};
2use std::{
3  cell::OnceCell,
4  collections::HashSet,
5  sync::{Arc, RwLock},
6};
7
8use crate::protocol::{
9  client::{Client, ClientVolume},
10  group::Group,
11  server::{Server, ServerDetails},
12  stream::{Stream, StreamProperties},
13  Notification, SnapcastResult,
14};
15
16/// group details as stored in the state object
17///
18/// this contains a [HashSet] of client ids instead of a vec of client structs
19#[derive(Clone, Debug)]
20pub struct StateGroup {
21  /// group id
22  pub id: String,
23  /// group name
24  pub name: String,
25  /// stream id
26  pub stream_id: String,
27  /// group muted status
28  pub muted: bool,
29  /// set of client ids in group
30  pub clients: HashSet<String>,
31}
32
33/// A wrapped state that can be shared between threads
34pub type WrappedState = Arc<State>;
35
36/// The state of the Snapcast server, automatically kept up to date by the client
37#[derive(Debug, Default)]
38pub struct State {
39  /// host and snapserver information
40  pub server: OnceCell<RwLock<ServerDetails>>,
41  /// group information keyed by group id
42  pub groups: DashMap<String, StateGroup>,
43  /// client information keyed by client id
44  pub clients: DashMap<String, Client>,
45  /// stream information keyed by stream id \
46  /// None indicates that the stream was recently added and properties have not been fetched
47  pub streams: DashMap<String, Option<Stream>>,
48}
49
50enum ClientPartialUpdate {
51  Volume(ClientVolume),
52  Latency(usize),
53  Name(String),
54}
55
56enum GroupPartialUpdate {
57  Name(String),
58  StreamId(String),
59  Muted(bool),
60}
61
62enum StreamPartialUpdate {
63  Properties(StreamProperties),
64}
65
66impl State {
67  pub(crate) fn handle_result(&self, data: SnapcastResult) {
68    match data {
69      // client
70      SnapcastResult::ClientGetStatus(result) => self.client_upsert(result.client),
71      SnapcastResult::ClientSetVolume(id, result) => {
72        self.client_partial_update(id, ClientPartialUpdate::Volume(result.volume))
73      }
74      SnapcastResult::ClientSetLatency(id, result) => {
75        self.client_partial_update(id, ClientPartialUpdate::Latency(result.latency))
76      }
77      SnapcastResult::ClientSetName(id, result) => {
78        self.client_partial_update(id, ClientPartialUpdate::Name(result.name))
79      }
80
81      // group
82      SnapcastResult::GroupGetStatus(result) => {
83        let clients = result.group.clients.iter().map(|c| c.id.clone()).collect();
84        self.group_upsert(result.group, clients);
85      }
86      SnapcastResult::GroupSetMute(id, result) => self.group_partial_update(id, GroupPartialUpdate::Muted(result.mute)),
87      SnapcastResult::GroupSetStream(id, result) => {
88        self.group_partial_update(id, GroupPartialUpdate::StreamId(result.stream_id))
89      }
90      SnapcastResult::GroupSetName(id, result) => self.group_partial_update(id, GroupPartialUpdate::Name(result.name)),
91      SnapcastResult::GroupSetClients(result) => self.full_server_upsert(result.server),
92
93      // server
94      SnapcastResult::ServerGetRPCVersion(_) => {}
95      SnapcastResult::ServerGetStatus(result) => self.full_server_upsert(result.server),
96      SnapcastResult::ServerDeleteClient(result) => self.full_server_upsert(result.server),
97
98      // stream
99      SnapcastResult::StreamAddStream(result) => self.stream_upsert(result.id, None),
100      SnapcastResult::StreamRemoveStream(result) => {
101        self.streams.remove(&result.id);
102      }
103      SnapcastResult::StreamControl(_) => {}
104      SnapcastResult::StreamSetProperty(_) => {}
105    };
106  }
107
108  pub(crate) fn handle_notification(&self, data: Notification) {
109    match data {
110      // client
111      Notification::ClientOnConnect { params } => self.client_upsert(params.client),
112      Notification::ClientOnDisconnect { params } => self.client_remove(params.id),
113      Notification::ClientOnVolumeChanged { params } => {
114        self.client_partial_update(params.id, ClientPartialUpdate::Volume(params.volume))
115      }
116      Notification::ClientOnLatencyChanged { params } => {
117        self.client_partial_update(params.id, ClientPartialUpdate::Latency(params.latency))
118      }
119      Notification::ClientOnNameChanged { params } => {
120        self.client_partial_update(params.id, ClientPartialUpdate::Name(params.name))
121      }
122
123      // group
124      Notification::GroupOnMute { params } => {
125        self.group_partial_update(params.id, GroupPartialUpdate::Muted(params.mute))
126      }
127      Notification::GroupOnStreamChanged { params } => {
128        self.group_partial_update(params.id, GroupPartialUpdate::StreamId(params.stream_id))
129      }
130      Notification::GroupOnNameChanged { params } => {
131        self.group_partial_update(params.id, GroupPartialUpdate::Name(params.name))
132      }
133
134      // server
135      Notification::ServerOnUpdate { params } => self.full_server_upsert(params.server),
136
137      // stream
138      Notification::StreamOnUpdate { params } => self.stream_upsert(params.stream.id.clone(), Some(params.stream)),
139      Notification::StreamOnProperties { params } => {
140        self.stream_partial_update(params.id, StreamPartialUpdate::Properties(params.properties))
141      }
142    };
143  }
144
145  fn full_server_upsert(&self, data: Server) {
146    self.server_details_upsert(data.server);
147
148    let group_keys: HashSet<&str> = data.groups.iter().map(|g| &*g.id).collect();
149    self.groups.retain(|k, _| group_keys.contains(k.as_str()));
150
151    let client_keys: HashSet<&str> = data
152      .groups
153      .iter()
154      .flat_map(|g| g.clients.iter().map(|c| &*c.id))
155      .collect();
156    self.clients.retain(|k, _| client_keys.contains(k.as_str()));
157
158    for mut group in data.groups {
159      let clients: HashSet<String> = group.clients.iter().map(|c| c.id.clone()).collect();
160
161      for client in group.clients.drain(..) {
162        self.client_upsert(client);
163      }
164
165      self.group_upsert(group, clients);
166    }
167
168    let stream_keys: HashSet<&str> = data.streams.iter().map(|s| &*s.id).collect();
169    self.streams.retain(|k, _| stream_keys.contains(k.as_str()));
170
171    for stream in data.streams {
172      self.stream_upsert(stream.id.clone(), Some(stream));
173    }
174  }
175
176  // client
177  fn client_upsert(&self, client: Client) {
178    let entry = self.clients.entry(client.id.clone());
179    if let Entry::Occupied(mut entry) = entry {
180      let entry = entry.get_mut();
181      *entry = client;
182    } else {
183      entry.insert(client);
184    }
185  }
186
187  fn client_remove(&self, id: String) {
188    self.clients.remove(&id);
189  }
190
191  fn client_partial_update(&self, id: String, update: ClientPartialUpdate) {
192    let entry = self.clients.entry(id);
193    if let Entry::Occupied(mut entry) = entry {
194      let entry = entry.get_mut();
195
196      match update {
197        ClientPartialUpdate::Volume(volume) => entry.config.volume = volume,
198        ClientPartialUpdate::Latency(latency) => entry.config.latency = latency,
199        ClientPartialUpdate::Name(name) => entry.config.name = name,
200      }
201    }
202  }
203
204  // group
205  fn group_upsert(&self, group: Group, clients: HashSet<String>) {
206    let entry = self.groups.entry(group.id.clone());
207    if let Entry::Occupied(mut entry) = entry {
208      let entry = entry.get_mut();
209
210      entry.name = group.name;
211      entry.stream_id = group.stream_id;
212      entry.muted = group.muted;
213      entry.clients = clients;
214    } else {
215      entry.insert(StateGroup {
216        id: group.id.clone(),
217        name: group.name,
218        stream_id: group.stream_id.clone(),
219        muted: group.muted,
220        clients,
221      });
222    }
223  }
224
225  fn group_partial_update(&self, id: String, update: GroupPartialUpdate) {
226    let entry = self.groups.entry(id.clone());
227    if let Entry::Occupied(mut entry) = entry {
228      let entry = entry.get_mut();
229
230      match update {
231        GroupPartialUpdate::Name(name) => entry.name = name,
232        GroupPartialUpdate::Muted(muted) => entry.muted = muted,
233        GroupPartialUpdate::StreamId(stream_id) => {
234          entry.stream_id = stream_id;
235        }
236      }
237    }
238  }
239
240  // server
241  fn server_details_upsert(&self, server: ServerDetails) {
242    if self.server.get().is_none() {
243      self.server.set(RwLock::new(server)).expect("this should never fail");
244    } else {
245      let mut entry = self.server.get().unwrap().write().expect("rwlock poisoned");
246      *entry = server;
247    }
248  }
249
250  // stream
251  fn stream_upsert(&self, id: String, stream: Option<Stream>) {
252    let entry = self.streams.entry(id);
253    if let Entry::Occupied(mut entry) = entry {
254      let entry = entry.get_mut();
255      *entry = stream;
256    } else {
257      entry.insert(stream);
258    }
259  }
260
261  fn stream_partial_update(&self, id: String, update: StreamPartialUpdate) {
262    let entry = self.streams.entry(id);
263    if let Entry::Occupied(mut entry) = entry {
264      let entry = entry.get_mut();
265
266      match update {
267        StreamPartialUpdate::Properties(properties) => {
268          if let Some(entry) = entry {
269            entry.properties = Some(properties);
270          }
271        }
272      }
273    }
274  }
275}