1use dashmap::{mapref::entry::Entry, DashMap};
2use std::{
3 cell::OnceCell,
4 collections::HashSet,
5 sync::{Arc, RwLock},
6};
7
8use crate::protocol::{
9 client::{Client, ClientVolume},
10 group::Group,
11 server::{Server, ServerDetails},
12 stream::{Stream, StreamProperties},
13 Notification, SnapcastResult,
14};
15
16#[derive(Clone, Debug)]
20pub struct StateGroup {
21 pub id: String,
23 pub name: String,
25 pub stream_id: String,
27 pub muted: bool,
29 pub clients: HashSet<String>,
31}
32
33pub type WrappedState = Arc<State>;
35
36#[derive(Debug, Default)]
38pub struct State {
39 pub server: OnceCell<RwLock<ServerDetails>>,
41 pub groups: DashMap<String, StateGroup>,
43 pub clients: DashMap<String, Client>,
45 pub streams: DashMap<String, Option<Stream>>,
48}
49
50enum ClientPartialUpdate {
51 Volume(ClientVolume),
52 Latency(usize),
53 Name(String),
54}
55
56enum GroupPartialUpdate {
57 Name(String),
58 StreamId(String),
59 Muted(bool),
60}
61
62enum StreamPartialUpdate {
63 Properties(StreamProperties),
64}
65
66impl State {
67 pub(crate) fn handle_result(&self, data: SnapcastResult) {
68 match data {
69 SnapcastResult::ClientGetStatus(result) => self.client_upsert(result.client),
71 SnapcastResult::ClientSetVolume(id, result) => {
72 self.client_partial_update(id, ClientPartialUpdate::Volume(result.volume))
73 }
74 SnapcastResult::ClientSetLatency(id, result) => {
75 self.client_partial_update(id, ClientPartialUpdate::Latency(result.latency))
76 }
77 SnapcastResult::ClientSetName(id, result) => {
78 self.client_partial_update(id, ClientPartialUpdate::Name(result.name))
79 }
80
81 SnapcastResult::GroupGetStatus(result) => {
83 let clients = result.group.clients.iter().map(|c| c.id.clone()).collect();
84 self.group_upsert(result.group, clients);
85 }
86 SnapcastResult::GroupSetMute(id, result) => self.group_partial_update(id, GroupPartialUpdate::Muted(result.mute)),
87 SnapcastResult::GroupSetStream(id, result) => {
88 self.group_partial_update(id, GroupPartialUpdate::StreamId(result.stream_id))
89 }
90 SnapcastResult::GroupSetName(id, result) => self.group_partial_update(id, GroupPartialUpdate::Name(result.name)),
91 SnapcastResult::GroupSetClients(result) => self.full_server_upsert(result.server),
92
93 SnapcastResult::ServerGetRPCVersion(_) => {}
95 SnapcastResult::ServerGetStatus(result) => self.full_server_upsert(result.server),
96 SnapcastResult::ServerDeleteClient(result) => self.full_server_upsert(result.server),
97
98 SnapcastResult::StreamAddStream(result) => self.stream_upsert(result.id, None),
100 SnapcastResult::StreamRemoveStream(result) => {
101 self.streams.remove(&result.id);
102 }
103 SnapcastResult::StreamControl(_) => {}
104 SnapcastResult::StreamSetProperty(_) => {}
105 };
106 }
107
108 pub(crate) fn handle_notification(&self, data: Notification) {
109 match data {
110 Notification::ClientOnConnect { params } => self.client_upsert(params.client),
112 Notification::ClientOnDisconnect { params } => self.client_remove(params.id),
113 Notification::ClientOnVolumeChanged { params } => {
114 self.client_partial_update(params.id, ClientPartialUpdate::Volume(params.volume))
115 }
116 Notification::ClientOnLatencyChanged { params } => {
117 self.client_partial_update(params.id, ClientPartialUpdate::Latency(params.latency))
118 }
119 Notification::ClientOnNameChanged { params } => {
120 self.client_partial_update(params.id, ClientPartialUpdate::Name(params.name))
121 }
122
123 Notification::GroupOnMute { params } => {
125 self.group_partial_update(params.id, GroupPartialUpdate::Muted(params.mute))
126 }
127 Notification::GroupOnStreamChanged { params } => {
128 self.group_partial_update(params.id, GroupPartialUpdate::StreamId(params.stream_id))
129 }
130 Notification::GroupOnNameChanged { params } => {
131 self.group_partial_update(params.id, GroupPartialUpdate::Name(params.name))
132 }
133
134 Notification::ServerOnUpdate { params } => self.full_server_upsert(params.server),
136
137 Notification::StreamOnUpdate { params } => self.stream_upsert(params.stream.id.clone(), Some(params.stream)),
139 Notification::StreamOnProperties { params } => {
140 self.stream_partial_update(params.id, StreamPartialUpdate::Properties(params.properties))
141 }
142 };
143 }
144
145 fn full_server_upsert(&self, data: Server) {
146 self.server_details_upsert(data.server);
147
148 let group_keys: HashSet<&str> = data.groups.iter().map(|g| &*g.id).collect();
149 self.groups.retain(|k, _| group_keys.contains(k.as_str()));
150
151 let client_keys: HashSet<&str> = data
152 .groups
153 .iter()
154 .flat_map(|g| g.clients.iter().map(|c| &*c.id))
155 .collect();
156 self.clients.retain(|k, _| client_keys.contains(k.as_str()));
157
158 for mut group in data.groups {
159 let clients: HashSet<String> = group.clients.iter().map(|c| c.id.clone()).collect();
160
161 for client in group.clients.drain(..) {
162 self.client_upsert(client);
163 }
164
165 self.group_upsert(group, clients);
166 }
167
168 let stream_keys: HashSet<&str> = data.streams.iter().map(|s| &*s.id).collect();
169 self.streams.retain(|k, _| stream_keys.contains(k.as_str()));
170
171 for stream in data.streams {
172 self.stream_upsert(stream.id.clone(), Some(stream));
173 }
174 }
175
176 fn client_upsert(&self, client: Client) {
178 let entry = self.clients.entry(client.id.clone());
179 if let Entry::Occupied(mut entry) = entry {
180 let entry = entry.get_mut();
181 *entry = client;
182 } else {
183 entry.insert(client);
184 }
185 }
186
187 fn client_remove(&self, id: String) {
188 self.clients.remove(&id);
189 }
190
191 fn client_partial_update(&self, id: String, update: ClientPartialUpdate) {
192 let entry = self.clients.entry(id);
193 if let Entry::Occupied(mut entry) = entry {
194 let entry = entry.get_mut();
195
196 match update {
197 ClientPartialUpdate::Volume(volume) => entry.config.volume = volume,
198 ClientPartialUpdate::Latency(latency) => entry.config.latency = latency,
199 ClientPartialUpdate::Name(name) => entry.config.name = name,
200 }
201 }
202 }
203
204 fn group_upsert(&self, group: Group, clients: HashSet<String>) {
206 let entry = self.groups.entry(group.id.clone());
207 if let Entry::Occupied(mut entry) = entry {
208 let entry = entry.get_mut();
209
210 entry.name = group.name;
211 entry.stream_id = group.stream_id;
212 entry.muted = group.muted;
213 entry.clients = clients;
214 } else {
215 entry.insert(StateGroup {
216 id: group.id.clone(),
217 name: group.name,
218 stream_id: group.stream_id.clone(),
219 muted: group.muted,
220 clients,
221 });
222 }
223 }
224
225 fn group_partial_update(&self, id: String, update: GroupPartialUpdate) {
226 let entry = self.groups.entry(id.clone());
227 if let Entry::Occupied(mut entry) = entry {
228 let entry = entry.get_mut();
229
230 match update {
231 GroupPartialUpdate::Name(name) => entry.name = name,
232 GroupPartialUpdate::Muted(muted) => entry.muted = muted,
233 GroupPartialUpdate::StreamId(stream_id) => {
234 entry.stream_id = stream_id;
235 }
236 }
237 }
238 }
239
240 fn server_details_upsert(&self, server: ServerDetails) {
242 if self.server.get().is_none() {
243 self.server.set(RwLock::new(server)).expect("this should never fail");
244 } else {
245 let mut entry = self.server.get().unwrap().write().expect("rwlock poisoned");
246 *entry = server;
247 }
248 }
249
250 fn stream_upsert(&self, id: String, stream: Option<Stream>) {
252 let entry = self.streams.entry(id);
253 if let Entry::Occupied(mut entry) = entry {
254 let entry = entry.get_mut();
255 *entry = stream;
256 } else {
257 entry.insert(stream);
258 }
259 }
260
261 fn stream_partial_update(&self, id: String, update: StreamPartialUpdate) {
262 let entry = self.streams.entry(id);
263 if let Entry::Occupied(mut entry) = entry {
264 let entry = entry.get_mut();
265
266 match update {
267 StreamPartialUpdate::Properties(properties) => {
268 if let Some(entry) = entry {
269 entry.properties = Some(properties);
270 }
271 }
272 }
273 }
274 }
275}