1use std::any::{Any, TypeId};
29use std::collections::{HashMap, HashSet};
30use std::net::IpAddr;
31use std::sync::{mpsc, Arc, Mutex, OnceLock};
32use std::thread::JoinHandle;
33use std::time::{Duration, Instant};
34
35use parking_lot::RwLock;
36
37use sonos_api::Service;
38use sonos_discovery::Device;
39use sonos_event_manager::{SonosEventManager, WatchRegistry};
40use tracing::info;
41
42use crate::event_worker::spawn_state_event_worker;
43use crate::iter::ChangeIterator;
44use crate::model::{GroupId, SpeakerId, SpeakerInfo};
45use crate::property::{GroupInfo, Property, SonosProperty, Topology};
46use crate::{Result, StateError};
47
48pub type EventInitFn = Arc<
54 dyn Fn() -> std::result::Result<(), Box<dyn std::error::Error + Send + Sync>> + Send + Sync,
55>;
56
57#[derive(Debug, Clone)]
63pub struct ChangeEvent {
64 pub speaker_id: SpeakerId,
66 pub property_key: &'static str,
68 pub service: Service,
70 pub timestamp: Instant,
72}
73
74impl ChangeEvent {
75 pub fn new(speaker_id: SpeakerId, property_key: &'static str, service: Service) -> Self {
76 Self {
77 speaker_id,
78 property_key,
79 service,
80 timestamp: Instant::now(),
81 }
82 }
83}
84
85pub struct StateStore {
91 pub(crate) speakers: HashMap<SpeakerId, SpeakerInfo>,
93 pub(crate) ip_to_speaker: HashMap<IpAddr, SpeakerId>,
95 pub(crate) speaker_props: HashMap<SpeakerId, PropertyBag>,
97 pub(crate) groups: HashMap<GroupId, GroupInfo>,
99 pub(crate) group_props: HashMap<GroupId, PropertyBag>,
101 pub(crate) system_props: PropertyBag,
103 pub(crate) speaker_to_group: HashMap<SpeakerId, GroupId>,
105}
106
107impl StateStore {
108 pub(crate) fn new() -> Self {
109 Self {
110 speakers: HashMap::new(),
111 ip_to_speaker: HashMap::new(),
112 speaker_props: HashMap::new(),
113 groups: HashMap::new(),
114 group_props: HashMap::new(),
115 system_props: PropertyBag::new(),
116 speaker_to_group: HashMap::new(),
117 }
118 }
119
120 pub(crate) fn add_speaker(&mut self, speaker: SpeakerInfo) {
121 let id = speaker.id.clone();
122 let ip = speaker.ip_address;
123 self.ip_to_speaker.insert(ip, id.clone());
124 self.speakers.insert(id.clone(), speaker);
125 self.speaker_props
126 .entry(id)
127 .or_insert_with(PropertyBag::new);
128 }
129
130 fn speaker(&self, id: &SpeakerId) -> Option<&SpeakerInfo> {
131 self.speakers.get(id)
132 }
133
134 fn speakers(&self) -> Vec<SpeakerInfo> {
135 self.speakers.values().cloned().collect()
136 }
137
138 pub(crate) fn add_group(&mut self, group: GroupInfo) {
139 let id = group.id.clone();
140 for member_id in &group.member_ids {
142 self.speaker_to_group.insert(member_id.clone(), id.clone());
143 }
144 self.groups.insert(id.clone(), group);
145 self.group_props.entry(id).or_insert_with(PropertyBag::new);
146 }
147
148 #[allow(dead_code)]
150 pub(crate) fn get_group_for_speaker(&self, speaker_id: &SpeakerId) -> Option<&GroupInfo> {
151 let group_id = self.speaker_to_group.get(speaker_id)?;
152 self.groups.get(group_id)
153 }
154
155 pub(crate) fn clear_groups(&mut self) {
159 self.groups.clear();
160 self.group_props.clear();
161 self.speaker_to_group.clear();
162 }
163
164 pub(crate) fn get<P: Property>(&self, speaker_id: &SpeakerId) -> Option<P> {
165 self.speaker_props.get(speaker_id)?.get::<P>()
166 }
167
168 pub(crate) fn set<P: Property>(&mut self, speaker_id: &SpeakerId, value: P) -> bool {
169 let bag = self
170 .speaker_props
171 .entry(speaker_id.clone())
172 .or_insert_with(PropertyBag::new);
173 bag.set(value)
174 }
175
176 pub(crate) fn get_group<P: Property>(&self, group_id: &GroupId) -> Option<P> {
177 self.group_props.get(group_id)?.get::<P>()
178 }
179
180 pub(crate) fn set_group<P: Property>(&mut self, group_id: &GroupId, value: P) -> bool {
181 let bag = self
182 .group_props
183 .entry(group_id.clone())
184 .or_insert_with(PropertyBag::new);
185 bag.set(value)
186 }
187
188 fn set_system<P: Property>(&mut self, value: P) -> bool {
189 self.system_props.set(value)
190 }
191
192 fn is_empty(&self) -> bool {
193 self.speakers.is_empty()
194 }
195
196 fn speaker_count(&self) -> usize {
197 self.speakers.len()
198 }
199
200 fn group_count(&self) -> usize {
201 self.groups.len()
202 }
203}
204
205pub(crate) struct PropertyBag {
210 values: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
212}
213
214impl PropertyBag {
215 pub(crate) fn new() -> Self {
216 Self {
217 values: HashMap::new(),
218 }
219 }
220
221 fn get<P: Property>(&self) -> Option<P> {
222 let type_id = TypeId::of::<P>();
223 self.values
224 .get(&type_id)
225 .and_then(|boxed| boxed.downcast_ref::<P>())
226 .cloned()
227 }
228
229 fn set<P: Property>(&mut self, value: P) -> bool {
230 let type_id = TypeId::of::<P>();
231 let current = self
232 .values
233 .get(&type_id)
234 .and_then(|boxed| boxed.downcast_ref::<P>());
235
236 if current != Some(&value) {
237 self.values.insert(type_id, Box::new(value));
238 true
239 } else {
240 false
241 }
242 }
243}
244
245pub struct StateManager {
254 store: Arc<RwLock<StateStore>>,
256
257 watched: Arc<RwLock<HashSet<(SpeakerId, &'static str)>>>,
259
260 ip_to_speaker: Arc<RwLock<HashMap<IpAddr, SpeakerId>>>,
262
263 event_manager: OnceLock<Arc<SonosEventManager>>,
265
266 event_tx: mpsc::Sender<ChangeEvent>,
268
269 event_rx: Arc<Mutex<mpsc::Receiver<ChangeEvent>>>,
271
272 _worker: Mutex<Option<JoinHandle<()>>>,
274
275 cleanup_timeout: Duration,
277
278 key_to_service: Arc<RwLock<HashMap<&'static str, Service>>>,
281
282 event_init: OnceLock<EventInitFn>,
285}
286
287struct StateWatchRegistry {
297 watched: Arc<RwLock<HashSet<(SpeakerId, &'static str)>>>,
298 ip_to_speaker: Arc<RwLock<HashMap<IpAddr, SpeakerId>>>,
299 key_to_service: Arc<RwLock<HashMap<&'static str, Service>>>,
300}
301
302impl WatchRegistry for StateWatchRegistry {
303 fn register_watch(&self, speaker_id: &SpeakerId, key: &'static str, service: Service) {
304 self.watched.write().insert((speaker_id.clone(), key));
305 self.key_to_service.write().insert(key, service);
306 }
307
308 fn unregister_watches_for_service(&self, ip: IpAddr, service: Service) {
309 let speaker_id = match self.ip_to_speaker.read().get(&ip).cloned() {
311 Some(id) => id,
312 None => {
313 tracing::warn!(
314 "unregister_watches_for_service: no speaker found for IP {}",
315 ip
316 );
317 return;
318 }
319 };
320
321 let service_keys: Vec<&'static str> = self
323 .key_to_service
324 .read()
325 .iter()
326 .filter(|(_, &svc)| svc == service)
327 .map(|(&key, _)| key)
328 .collect();
329
330 let mut watched = self.watched.write();
332 for key in service_keys {
333 watched.remove(&(speaker_id.clone(), key));
334 }
335 }
336}
337
338impl StateManager {
339 pub fn new() -> Result<Self> {
347 Self::builder().build()
348 }
349
350 pub fn builder() -> StateManagerBuilder {
352 StateManagerBuilder::default()
353 }
354
355 pub fn add_devices(&self, devices: Vec<Device>) -> Result<()> {
364 let mut store = self.store.write();
365 let mut ip_map = self.ip_to_speaker.write();
366
367 for device in devices {
368 let speaker_id = SpeakerId::new(&device.id);
369 let ip: IpAddr = device
370 .ip_address
371 .parse()
372 .map_err(|_| StateError::InvalidIpAddress(device.ip_address.clone()))?;
373
374 let friendly_name = if device.room_name.is_empty() || device.room_name == "Unknown" {
375 device.name.clone()
376 } else {
377 device.room_name.clone()
378 };
379
380 let info = SpeakerInfo {
381 id: speaker_id.clone(),
382 name: friendly_name,
383 room_name: device.room_name.clone(),
384 ip_address: ip,
385 port: device.port,
386 model_name: device.model_name.clone(),
387 software_version: "unknown".to_string(),
388 boot_seq: 0,
389 satellites: vec![],
390 };
391
392 ip_map.insert(ip, speaker_id.clone());
394 tracing::debug!(
395 "Added speaker {} at IP {} to ip_to_speaker map",
396 speaker_id.as_str(),
397 ip
398 );
399
400 store.add_speaker(info);
401 }
402
403 drop(store);
405 drop(ip_map);
406
407 if let Some(em) = self.event_manager.get() {
408 let devices_for_em: Vec<_> = self
409 .speaker_infos()
410 .iter()
411 .map(|info| sonos_discovery::Device {
412 id: info.id.as_str().to_string(),
413 name: info.name.clone(),
414 room_name: info.room_name.clone(),
415 ip_address: info.ip_address.to_string(),
416 port: info.port,
417 model_name: info.model_name.clone(),
418 })
419 .collect();
420
421 if let Err(e) = em.add_devices(devices_for_em) {
422 tracing::warn!("Failed to add devices to event manager: {}", e);
423 }
424 }
425
426 Ok(())
427 }
428
429 pub fn speaker_infos(&self) -> Vec<SpeakerInfo> {
431 self.store.read().speakers()
432 }
433
434 pub fn speaker_info(&self, speaker_id: &SpeakerId) -> Option<SpeakerInfo> {
436 self.store.read().speaker(speaker_id).cloned()
437 }
438
439 pub fn get_speaker_ip(&self, speaker_id: &SpeakerId) -> Option<IpAddr> {
441 self.store.read().speaker(speaker_id).map(|s| s.ip_address)
442 }
443
444 pub fn get_boot_seq(&self, speaker_id: &SpeakerId) -> Option<u32> {
446 self.store.read().speaker(speaker_id).map(|s| s.boot_seq)
447 }
448
449 pub fn iter(&self) -> ChangeIterator {
465 ChangeIterator::new(Arc::clone(&self.event_rx))
466 }
467
468 pub fn get_property<P: Property>(&self, speaker_id: &SpeakerId) -> Option<P> {
470 self.store.read().get::<P>(speaker_id)
471 }
472
473 pub fn get_group_property<P: Property>(&self, group_id: &GroupId) -> Option<P> {
475 self.store.read().get_group::<P>(group_id)
476 }
477
478 pub fn set_property<P: SonosProperty>(&self, speaker_id: &SpeakerId, value: P) {
483 let changed = {
484 let mut store = self.store.write();
485 store.set::<P>(speaker_id, value)
486 };
487
488 if changed {
489 self.maybe_emit_change(speaker_id, P::KEY, P::SERVICE);
490 }
491 }
492
493 pub fn set_group_property<P: SonosProperty>(&self, group_id: &GroupId, value: P) {
499 let coordinator_id = {
500 let mut store = self.store.write();
501 let changed = store.set_group::<P>(group_id, value);
502 if !changed {
503 return;
504 }
505 store.groups.get(group_id).map(|g| g.coordinator_id.clone())
506 };
507
508 if let Some(coordinator_id) = coordinator_id {
509 self.maybe_emit_change(&coordinator_id, P::KEY, P::SERVICE);
510 }
511 }
512
513 pub fn register_watch(&self, speaker_id: &SpeakerId, property_key: &'static str) {
515 self.watched
516 .write()
517 .insert((speaker_id.clone(), property_key));
518 }
519
520 pub fn unregister_watch(&self, speaker_id: &SpeakerId, property_key: &'static str) {
522 self.watched
523 .write()
524 .remove(&(speaker_id.clone(), property_key));
525 }
526
527 pub fn watch_property_with_subscription<P: SonosProperty>(
535 &self,
536 speaker_id: &SpeakerId,
537 ) -> Result<Option<P>> {
538 self.register_watch(speaker_id, P::KEY);
540
541 if let Some(em) = self.event_manager.get() {
543 if let Some(ip) = self.get_speaker_ip(speaker_id) {
545 if let Err(e) = em.ensure_service_subscribed(ip, P::SERVICE) {
546 tracing::warn!(
547 "Failed to subscribe to {:?} for {}: {}",
548 P::SERVICE,
549 speaker_id.as_str(),
550 e
551 );
552 }
553 }
554 }
555
556 Ok(self.get_property::<P>(speaker_id))
557 }
558
559 pub fn unwatch_property_with_subscription<P: SonosProperty>(&self, speaker_id: &SpeakerId) {
561 self.unregister_watch(speaker_id, P::KEY);
563
564 if let Some(em) = self.event_manager.get() {
566 if let Some(ip) = self.get_speaker_ip(speaker_id) {
567 if let Err(e) = em.release_service_subscription(ip, P::SERVICE) {
568 tracing::warn!(
569 "Failed to unsubscribe from {:?} for {}: {}",
570 P::SERVICE,
571 speaker_id.as_str(),
572 e
573 );
574 }
575 }
576 }
577 }
578
579 pub fn is_watched(&self, speaker_id: &SpeakerId, property_key: &'static str) -> bool {
581 self.watched
582 .read()
583 .contains(&(speaker_id.clone(), property_key))
584 }
585
586 fn maybe_emit_change(
588 &self,
589 speaker_id: &SpeakerId,
590 property_key: &'static str,
591 service: Service,
592 ) {
593 let is_watched = self
594 .watched
595 .read()
596 .contains(&(speaker_id.clone(), property_key));
597
598 if is_watched {
599 let event = ChangeEvent::new(speaker_id.clone(), property_key, service);
600 let _ = self.event_tx.send(event);
601 }
602 }
603
604 pub fn initialize(&self, topology: Topology) {
606 let mut store = self.store.write();
607 for speaker in &topology.speakers {
608 store.add_speaker(speaker.clone());
609 }
610 for group in &topology.groups {
611 store.add_group(group.clone());
612 }
613 store.set_system(topology);
614 }
615
616 pub fn is_initialized(&self) -> bool {
618 !self.store.read().is_empty()
619 }
620
621 pub fn speaker_count(&self) -> usize {
623 self.store.read().speaker_count()
624 }
625
626 pub fn group_count(&self) -> usize {
628 self.store.read().group_count()
629 }
630
631 pub fn groups(&self) -> Vec<GroupInfo> {
636 self.store.read().groups.values().cloned().collect()
637 }
638
639 pub fn get_group(&self, group_id: &GroupId) -> Option<GroupInfo> {
641 self.store.read().groups.get(group_id).cloned()
642 }
643
644 pub fn get_group_for_speaker(&self, speaker_id: &SpeakerId) -> Option<GroupInfo> {
648 let store = self.store.read();
649 let group_id = store.speaker_to_group.get(speaker_id)?;
650 store.groups.get(group_id).cloned()
651 }
652
653 pub fn event_manager(&self) -> Option<&Arc<SonosEventManager>> {
658 self.event_manager.get()
659 }
660
661 pub fn set_event_manager(&self, em: Arc<SonosEventManager>) -> Result<()> {
666 tracing::debug!("StateManager::set_event_manager called");
667 if self.event_manager.set(Arc::clone(&em)).is_err() {
668 tracing::debug!("Event manager already set — no-op");
669 return Ok(()); }
671
672 em.set_watch_registry(Arc::new(StateWatchRegistry {
674 watched: Arc::clone(&self.watched),
675 ip_to_speaker: Arc::clone(&self.ip_to_speaker),
676 key_to_service: Arc::clone(&self.key_to_service),
677 }));
678
679 let devices_for_em: Vec<_> = self
681 .speaker_infos()
682 .iter()
683 .map(|info| sonos_discovery::Device {
684 id: info.id.as_str().to_string(),
685 name: info.name.clone(),
686 room_name: info.room_name.clone(),
687 ip_address: info.ip_address.to_string(),
688 port: info.port,
689 model_name: info.model_name.clone(),
690 })
691 .collect();
692
693 if let Err(e) = em.add_devices(devices_for_em) {
694 tracing::warn!(
695 "Failed to add devices to event manager during lazy init: {}",
696 e
697 );
698 }
699
700 let worker = spawn_state_event_worker(
702 em,
703 Arc::clone(&self.store),
704 Arc::clone(&self.watched),
705 self.event_tx.clone(),
706 Arc::clone(&self.ip_to_speaker),
707 );
708 info!("StateManager event worker started (lazy init)");
709
710 if let Ok(mut w) = self._worker.lock() {
711 *w = Some(worker);
712 }
713
714 Ok(())
715 }
716
717 pub fn set_event_init(&self, f: EventInitFn) {
722 let _ = self.event_init.set(f);
723 }
724
725 pub fn event_init(&self) -> Option<&EventInitFn> {
730 self.event_init.get()
731 }
732}
733
734impl Clone for StateManager {
735 fn clone(&self) -> Self {
736 let event_manager = OnceLock::new();
737 if let Some(em) = self.event_manager.get() {
738 let _ = event_manager.set(Arc::clone(em));
739 }
740 let event_init = OnceLock::new();
741 if let Some(f) = self.event_init.get() {
742 let _ = event_init.set(Arc::clone(f));
743 }
744 Self {
745 store: Arc::clone(&self.store),
746 watched: Arc::clone(&self.watched),
747 ip_to_speaker: Arc::clone(&self.ip_to_speaker),
748 event_manager,
749 event_tx: self.event_tx.clone(),
750 event_rx: Arc::clone(&self.event_rx),
751 _worker: Mutex::new(None),
752 cleanup_timeout: self.cleanup_timeout,
753 key_to_service: Arc::clone(&self.key_to_service),
754 event_init,
755 }
756 }
757}
758
759pub struct StateManagerBuilder {
765 cleanup_timeout: Duration,
766 event_manager: Option<Arc<SonosEventManager>>,
767}
768
769impl Default for StateManagerBuilder {
770 fn default() -> Self {
771 Self {
772 cleanup_timeout: Duration::from_secs(5),
773 event_manager: None,
774 }
775 }
776}
777
778impl StateManagerBuilder {
779 pub fn cleanup_timeout(mut self, timeout: Duration) -> Self {
781 self.cleanup_timeout = timeout;
782 self
783 }
784
785 pub fn with_event_manager(mut self, em: Arc<SonosEventManager>) -> Self {
792 self.event_manager = Some(em);
793 self
794 }
795
796 pub fn build(self) -> Result<StateManager> {
798 let (event_tx, event_rx) = mpsc::channel();
799
800 let store = Arc::new(RwLock::new(StateStore::new()));
801 let watched = Arc::new(RwLock::new(HashSet::new()));
802 let ip_to_speaker = Arc::new(RwLock::new(HashMap::new()));
803 let key_to_service = Arc::new(RwLock::new(HashMap::new()));
804
805 let event_manager_lock = OnceLock::new();
806 let mut worker = None;
807
808 if let Some(em) = self.event_manager {
810 let _ = event_manager_lock.set(Arc::clone(&em));
811
812 em.set_watch_registry(Arc::new(StateWatchRegistry {
814 watched: Arc::clone(&watched),
815 ip_to_speaker: Arc::clone(&ip_to_speaker),
816 key_to_service: Arc::clone(&key_to_service),
817 }));
818
819 let worker_handle = spawn_state_event_worker(
820 em,
821 Arc::clone(&store),
822 Arc::clone(&watched),
823 event_tx.clone(),
824 Arc::clone(&ip_to_speaker),
825 );
826 info!("StateManager event worker started");
827 worker = Some(worker_handle);
828 }
829
830 let manager = StateManager {
831 store,
832 watched,
833 ip_to_speaker,
834 event_manager: event_manager_lock,
835 event_tx,
836 event_rx: Arc::new(Mutex::new(event_rx)),
837 _worker: Mutex::new(worker),
838 cleanup_timeout: self.cleanup_timeout,
839 key_to_service,
840 event_init: OnceLock::new(),
841 };
842
843 info!("StateManager created (sync-first mode)");
844 Ok(manager)
845 }
846}
847
848#[cfg(test)]
849mod tests {
850 use super::*;
851 use crate::property::{GroupVolume, Volume};
852 use sonos_api::Service;
853
854 #[test]
855 fn test_state_manager_creation() {
856 let manager = StateManager::new().unwrap();
857 assert!(!manager.is_initialized());
858 assert_eq!(manager.speaker_count(), 0);
859 }
860
861 #[test]
862 fn test_add_devices() {
863 let manager = StateManager::new().unwrap();
864
865 let devices = vec![Device {
866 id: "RINCON_123".to_string(),
867 name: "Living Room".to_string(),
868 room_name: "Living Room".to_string(),
869 ip_address: "192.168.1.100".to_string(),
870 port: 1400,
871 model_name: "Sonos One".to_string(),
872 }];
873
874 manager.add_devices(devices).unwrap();
875 assert_eq!(manager.speaker_count(), 1);
876 }
877
878 #[test]
879 fn test_property_storage() {
880 let manager = StateManager::new().unwrap();
881
882 let devices = vec![Device {
883 id: "RINCON_123".to_string(),
884 name: "Living Room".to_string(),
885 room_name: "Living Room".to_string(),
886 ip_address: "192.168.1.100".to_string(),
887 port: 1400,
888 model_name: "Sonos One".to_string(),
889 }];
890 manager.add_devices(devices).unwrap();
891
892 let speaker_id = SpeakerId::new("RINCON_123");
893
894 assert!(manager.get_property::<Volume>(&speaker_id).is_none());
896
897 manager.set_property(&speaker_id, Volume::new(50));
899 assert_eq!(
900 manager.get_property::<Volume>(&speaker_id),
901 Some(Volume::new(50))
902 );
903 }
904
905 #[test]
906 fn test_watch_registration() {
907 let manager = StateManager::new().unwrap();
908
909 let devices = vec![Device {
910 id: "RINCON_123".to_string(),
911 name: "Living Room".to_string(),
912 room_name: "Living Room".to_string(),
913 ip_address: "192.168.1.100".to_string(),
914 port: 1400,
915 model_name: "Sonos One".to_string(),
916 }];
917 manager.add_devices(devices).unwrap();
918
919 let speaker_id = SpeakerId::new("RINCON_123");
920
921 assert!(!manager.is_watched(&speaker_id, "volume"));
923
924 manager.register_watch(&speaker_id, "volume");
926 assert!(manager.is_watched(&speaker_id, "volume"));
927
928 manager.unregister_watch(&speaker_id, "volume");
930 assert!(!manager.is_watched(&speaker_id, "volume"));
931 }
932
933 #[test]
934 fn test_change_event_emission() {
935 let manager = StateManager::new().unwrap();
936
937 let devices = vec![Device {
938 id: "RINCON_123".to_string(),
939 name: "Living Room".to_string(),
940 room_name: "Living Room".to_string(),
941 ip_address: "192.168.1.100".to_string(),
942 port: 1400,
943 model_name: "Sonos One".to_string(),
944 }];
945 manager.add_devices(devices).unwrap();
946
947 let speaker_id = SpeakerId::new("RINCON_123");
948
949 manager.register_watch(&speaker_id, "volume");
951
952 manager.set_property(&speaker_id, Volume::new(75));
954
955 let iter = manager.iter();
957 let event = iter.recv_timeout(std::time::Duration::from_millis(100));
958 assert!(event.is_some());
959
960 let event = event.unwrap();
961 assert_eq!(event.speaker_id.as_str(), "RINCON_123");
962 assert_eq!(event.property_key, "volume");
963 }
964
965 #[test]
966 fn test_set_group_property_emits_change_event() {
967 let manager = StateManager::new().unwrap();
968
969 let devices = vec![Device {
970 id: "RINCON_123".to_string(),
971 name: "Living Room".to_string(),
972 room_name: "Living Room".to_string(),
973 ip_address: "192.168.1.100".to_string(),
974 port: 1400,
975 model_name: "Sonos One".to_string(),
976 }];
977 manager.add_devices(devices).unwrap();
978
979 let speaker_id = SpeakerId::new("RINCON_123");
980 let group_id = GroupId::new("RINCON_123:1");
981
982 {
984 let mut store = manager.store.write();
985 store.add_group(GroupInfo::new(
986 group_id.clone(),
987 speaker_id.clone(),
988 vec![speaker_id.clone()],
989 ));
990 }
991
992 manager.register_watch(&speaker_id, "group_volume");
994
995 manager.set_group_property(&group_id, GroupVolume::new(80));
997
998 let iter = manager.iter();
1000 let event = iter.recv_timeout(std::time::Duration::from_millis(100));
1001 assert!(event.is_some());
1002
1003 let event = event.unwrap();
1004 assert_eq!(event.speaker_id.as_str(), "RINCON_123");
1005 assert_eq!(event.property_key, "group_volume");
1006 assert_eq!(event.service, Service::GroupRenderingControl);
1007 }
1008
1009 #[test]
1010 fn test_set_group_property_no_event_when_unwatched() {
1011 let manager = StateManager::new().unwrap();
1012
1013 let devices = vec![Device {
1014 id: "RINCON_123".to_string(),
1015 name: "Living Room".to_string(),
1016 room_name: "Living Room".to_string(),
1017 ip_address: "192.168.1.100".to_string(),
1018 port: 1400,
1019 model_name: "Sonos One".to_string(),
1020 }];
1021 manager.add_devices(devices).unwrap();
1022
1023 let speaker_id = SpeakerId::new("RINCON_123");
1024 let group_id = GroupId::new("RINCON_123:1");
1025
1026 {
1027 let mut store = manager.store.write();
1028 store.add_group(GroupInfo::new(
1029 group_id.clone(),
1030 speaker_id.clone(),
1031 vec![speaker_id.clone()],
1032 ));
1033 }
1034
1035 manager.set_group_property(&group_id, GroupVolume::new(50));
1037
1038 let iter = manager.iter();
1039 let event = iter.recv_timeout(std::time::Duration::from_millis(100));
1040 assert!(event.is_none());
1041 }
1042
1043 #[test]
1048 fn test_add_group_updates_speaker_to_group() {
1049 let mut store = StateStore::new();
1050
1051 let speaker1 = SpeakerId::new("RINCON_111");
1052 let speaker2 = SpeakerId::new("RINCON_222");
1053 let group_id = GroupId::new("RINCON_111:1");
1054
1055 let group = GroupInfo::new(
1056 group_id.clone(),
1057 speaker1.clone(),
1058 vec![speaker1.clone(), speaker2.clone()],
1059 );
1060
1061 store.add_group(group);
1062
1063 assert_eq!(store.speaker_to_group.get(&speaker1), Some(&group_id));
1065 assert_eq!(store.speaker_to_group.get(&speaker2), Some(&group_id));
1066 }
1067
1068 #[test]
1069 fn test_add_group_single_speaker() {
1070 let mut store = StateStore::new();
1071
1072 let speaker = SpeakerId::new("RINCON_333");
1073 let group_id = GroupId::new("RINCON_333:1");
1074
1075 let group = GroupInfo::new(group_id.clone(), speaker.clone(), vec![speaker.clone()]);
1076
1077 store.add_group(group.clone());
1078
1079 assert_eq!(store.speaker_to_group.get(&speaker), Some(&group_id));
1081
1082 assert_eq!(store.groups.get(&group_id), Some(&group));
1084 }
1085
1086 #[test]
1087 fn test_get_group_for_speaker_returns_correct_group() {
1088 let mut store = StateStore::new();
1089
1090 let speaker1 = SpeakerId::new("RINCON_111");
1091 let speaker2 = SpeakerId::new("RINCON_222");
1092 let speaker3 = SpeakerId::new("RINCON_333");
1093 let group1_id = GroupId::new("RINCON_111:1");
1094 let group2_id = GroupId::new("RINCON_333:1");
1095
1096 let group1 = GroupInfo::new(
1098 group1_id.clone(),
1099 speaker1.clone(),
1100 vec![speaker1.clone(), speaker2.clone()],
1101 );
1102
1103 let group2 = GroupInfo::new(group2_id.clone(), speaker3.clone(), vec![speaker3.clone()]);
1105
1106 store.add_group(group1.clone());
1107 store.add_group(group2.clone());
1108
1109 assert_eq!(store.get_group_for_speaker(&speaker1), Some(&group1));
1111 assert_eq!(store.get_group_for_speaker(&speaker2), Some(&group1));
1112 assert_eq!(store.get_group_for_speaker(&speaker3), Some(&group2));
1113 }
1114
1115 #[test]
1116 fn test_get_group_for_speaker_returns_none_for_unknown() {
1117 let store = StateStore::new();
1118
1119 let unknown_speaker = SpeakerId::new("RINCON_UNKNOWN");
1120
1121 assert!(store.get_group_for_speaker(&unknown_speaker).is_none());
1122 }
1123
1124 #[test]
1125 fn test_clear_groups_removes_all_group_data() {
1126 let mut store = StateStore::new();
1127
1128 let speaker1 = SpeakerId::new("RINCON_111");
1129 let speaker2 = SpeakerId::new("RINCON_222");
1130 let group_id = GroupId::new("RINCON_111:1");
1131
1132 let group = GroupInfo::new(
1133 group_id.clone(),
1134 speaker1.clone(),
1135 vec![speaker1.clone(), speaker2.clone()],
1136 );
1137
1138 store.add_group(group);
1139
1140 assert!(!store.groups.is_empty());
1142 assert!(!store.speaker_to_group.is_empty());
1143
1144 store.clear_groups();
1146
1147 assert!(store.groups.is_empty());
1149 assert!(store.group_props.is_empty());
1150 assert!(store.speaker_to_group.is_empty());
1151 }
1152
1153 #[test]
1154 fn test_clear_groups_then_add_new_groups() {
1155 let mut store = StateStore::new();
1156
1157 let speaker1 = SpeakerId::new("RINCON_111");
1159 let group1_id = GroupId::new("RINCON_111:1");
1160 let group1 = GroupInfo::new(group1_id.clone(), speaker1.clone(), vec![speaker1.clone()]);
1161 store.add_group(group1);
1162
1163 store.clear_groups();
1165
1166 let speaker2 = SpeakerId::new("RINCON_222");
1167 let group2_id = GroupId::new("RINCON_222:1");
1168 let group2 = GroupInfo::new(group2_id.clone(), speaker2.clone(), vec![speaker2.clone()]);
1169 store.add_group(group2.clone());
1170
1171 assert!(!store.groups.contains_key(&group1_id));
1173 assert_eq!(store.groups.get(&group2_id), Some(&group2));
1174
1175 assert!(!store.speaker_to_group.contains_key(&speaker1));
1177 assert_eq!(store.speaker_to_group.get(&speaker2), Some(&group2_id));
1178 }
1179
1180 #[test]
1185 fn test_state_manager_groups_returns_all_groups() {
1186 let manager = StateManager::new().unwrap();
1187
1188 let devices = vec![
1190 Device {
1191 id: "RINCON_111".to_string(),
1192 name: "Living Room".to_string(),
1193 room_name: "Living Room".to_string(),
1194 ip_address: "192.168.1.100".to_string(),
1195 port: 1400,
1196 model_name: "Sonos One".to_string(),
1197 },
1198 Device {
1199 id: "RINCON_222".to_string(),
1200 name: "Kitchen".to_string(),
1201 room_name: "Kitchen".to_string(),
1202 ip_address: "192.168.1.101".to_string(),
1203 port: 1400,
1204 model_name: "Sonos One".to_string(),
1205 },
1206 ];
1207 manager.add_devices(devices).unwrap();
1208
1209 let speaker1 = SpeakerId::new("RINCON_111");
1211 let speaker2 = SpeakerId::new("RINCON_222");
1212 let group1 = GroupInfo::new(
1213 GroupId::new("RINCON_111:1"),
1214 speaker1.clone(),
1215 vec![speaker1.clone()],
1216 );
1217 let group2 = GroupInfo::new(
1218 GroupId::new("RINCON_222:1"),
1219 speaker2.clone(),
1220 vec![speaker2.clone()],
1221 );
1222
1223 let topology = Topology::new(
1224 manager.speaker_infos(),
1225 vec![group1.clone(), group2.clone()],
1226 );
1227 manager.initialize(topology);
1228
1229 let groups = manager.groups();
1231 assert_eq!(groups.len(), 2);
1232
1233 let group_ids: Vec<_> = groups.iter().map(|g| g.id.clone()).collect();
1235 assert!(group_ids.contains(&GroupId::new("RINCON_111:1")));
1236 assert!(group_ids.contains(&GroupId::new("RINCON_222:1")));
1237 }
1238
1239 #[test]
1240 fn test_state_manager_groups_returns_empty_when_no_groups() {
1241 let manager = StateManager::new().unwrap();
1242
1243 let groups = manager.groups();
1245 assert!(groups.is_empty());
1246 }
1247
1248 #[test]
1249 fn test_state_manager_get_group_returns_correct_group() {
1250 let manager = StateManager::new().unwrap();
1251
1252 let devices = vec![Device {
1254 id: "RINCON_111".to_string(),
1255 name: "Living Room".to_string(),
1256 room_name: "Living Room".to_string(),
1257 ip_address: "192.168.1.100".to_string(),
1258 port: 1400,
1259 model_name: "Sonos One".to_string(),
1260 }];
1261 manager.add_devices(devices).unwrap();
1262
1263 let speaker = SpeakerId::new("RINCON_111");
1265 let group_id = GroupId::new("RINCON_111:1");
1266 let group = GroupInfo::new(group_id.clone(), speaker.clone(), vec![speaker.clone()]);
1267
1268 let topology = Topology::new(manager.speaker_infos(), vec![group.clone()]);
1269 manager.initialize(topology);
1270
1271 let found = manager.get_group(&group_id);
1273 assert!(found.is_some());
1274 assert_eq!(found.unwrap(), group);
1275 }
1276
1277 #[test]
1278 fn test_state_manager_get_group_returns_none_for_unknown() {
1279 let manager = StateManager::new().unwrap();
1280
1281 let unknown_id = GroupId::new("RINCON_UNKNOWN:1");
1283 let found = manager.get_group(&unknown_id);
1284 assert!(found.is_none());
1285 }
1286
1287 #[test]
1288 fn test_state_manager_get_group_for_speaker_returns_correct_group() {
1289 let manager = StateManager::new().unwrap();
1290
1291 let devices = vec![
1293 Device {
1294 id: "RINCON_111".to_string(),
1295 name: "Living Room".to_string(),
1296 room_name: "Living Room".to_string(),
1297 ip_address: "192.168.1.100".to_string(),
1298 port: 1400,
1299 model_name: "Sonos One".to_string(),
1300 },
1301 Device {
1302 id: "RINCON_222".to_string(),
1303 name: "Kitchen".to_string(),
1304 room_name: "Kitchen".to_string(),
1305 ip_address: "192.168.1.101".to_string(),
1306 port: 1400,
1307 model_name: "Sonos One".to_string(),
1308 },
1309 ];
1310 manager.add_devices(devices).unwrap();
1311
1312 let speaker1 = SpeakerId::new("RINCON_111");
1314 let speaker2 = SpeakerId::new("RINCON_222");
1315 let group_id = GroupId::new("RINCON_111:1");
1316 let group = GroupInfo::new(
1317 group_id.clone(),
1318 speaker1.clone(),
1319 vec![speaker1.clone(), speaker2.clone()],
1320 );
1321
1322 let topology = Topology::new(manager.speaker_infos(), vec![group.clone()]);
1323 manager.initialize(topology);
1324
1325 let found1 = manager.get_group_for_speaker(&speaker1);
1327 assert!(found1.is_some());
1328 assert_eq!(found1.unwrap(), group);
1329
1330 let found2 = manager.get_group_for_speaker(&speaker2);
1331 assert!(found2.is_some());
1332 assert_eq!(found2.unwrap(), group);
1333 }
1334
1335 #[test]
1336 fn test_state_manager_get_group_for_speaker_returns_none_for_unknown() {
1337 let manager = StateManager::new().unwrap();
1338
1339 let unknown_speaker = SpeakerId::new("RINCON_UNKNOWN");
1341 let found = manager.get_group_for_speaker(&unknown_speaker);
1342 assert!(found.is_none());
1343 }
1344
1345 #[test]
1346 fn test_state_manager_group_methods_consistency() {
1347 let manager = StateManager::new().unwrap();
1348
1349 let devices = vec![Device {
1351 id: "RINCON_111".to_string(),
1352 name: "Living Room".to_string(),
1353 room_name: "Living Room".to_string(),
1354 ip_address: "192.168.1.100".to_string(),
1355 port: 1400,
1356 model_name: "Sonos One".to_string(),
1357 }];
1358 manager.add_devices(devices).unwrap();
1359
1360 let speaker = SpeakerId::new("RINCON_111");
1362 let group_id = GroupId::new("RINCON_111:1");
1363 let group = GroupInfo::new(group_id.clone(), speaker.clone(), vec![speaker.clone()]);
1364
1365 let topology = Topology::new(manager.speaker_infos(), vec![group.clone()]);
1366 manager.initialize(topology);
1367
1368 let groups = manager.groups();
1370 assert_eq!(groups.len(), 1);
1371 assert_eq!(groups[0], group);
1372
1373 let by_id = manager.get_group(&group_id);
1374 assert_eq!(by_id, Some(group.clone()));
1375
1376 let by_speaker = manager.get_group_for_speaker(&speaker);
1377 assert_eq!(by_speaker, Some(group.clone()));
1378
1379 assert_eq!(groups[0], by_id.unwrap());
1381 assert_eq!(groups[0], by_speaker.unwrap());
1382 }
1383
1384 #[test]
1389 fn test_get_boot_seq_returns_none_for_unknown_speaker() {
1390 let manager = StateManager::new().unwrap();
1391 let unknown = SpeakerId::new("RINCON_UNKNOWN");
1392 assert!(manager.get_boot_seq(&unknown).is_none());
1393 }
1394
1395 #[test]
1396 fn test_boot_seq_defaults_to_zero_for_new_speaker() {
1397 let manager = StateManager::new().unwrap();
1398
1399 let devices = vec![Device {
1400 id: "RINCON_123".to_string(),
1401 name: "Living Room".to_string(),
1402 room_name: "Living Room".to_string(),
1403 ip_address: "192.168.1.100".to_string(),
1404 port: 1400,
1405 model_name: "Sonos One".to_string(),
1406 }];
1407 manager.add_devices(devices).unwrap();
1408
1409 let speaker_id = SpeakerId::new("RINCON_123");
1410
1411 assert_eq!(manager.get_boot_seq(&speaker_id), Some(0));
1413 }
1414
1415 #[test]
1420 fn test_state_watch_registry_register_and_unregister() {
1421 let watched = Arc::new(RwLock::new(HashSet::new()));
1422 let ip_to_speaker = Arc::new(RwLock::new(HashMap::new()));
1423 let key_to_service = Arc::new(RwLock::new(HashMap::new()));
1424
1425 let ip: IpAddr = "192.168.1.100".parse().unwrap();
1426 let speaker_id = SpeakerId::new("RINCON_123");
1427 ip_to_speaker.write().insert(ip, speaker_id.clone());
1428
1429 let registry = StateWatchRegistry {
1430 watched: Arc::clone(&watched),
1431 ip_to_speaker: Arc::clone(&ip_to_speaker),
1432 key_to_service: Arc::clone(&key_to_service),
1433 };
1434
1435 registry.register_watch(&speaker_id, "volume", Service::RenderingControl);
1437 registry.register_watch(&speaker_id, "mute", Service::RenderingControl);
1438 registry.register_watch(&speaker_id, "playback_state", Service::AVTransport);
1439
1440 assert_eq!(watched.read().len(), 3);
1441
1442 registry.unregister_watches_for_service(ip, Service::RenderingControl);
1444
1445 let w = watched.read();
1446 assert_eq!(w.len(), 1);
1447 assert!(w.contains(&(speaker_id.clone(), "playback_state")));
1448 assert!(!w.contains(&(speaker_id.clone(), "volume")));
1449 assert!(!w.contains(&(speaker_id.clone(), "mute")));
1450 }
1451
1452 #[test]
1453 fn test_state_watch_registry_unknown_ip_is_noop() {
1454 let watched = Arc::new(RwLock::new(HashSet::new()));
1455 let ip_to_speaker = Arc::new(RwLock::new(HashMap::new()));
1456 let key_to_service = Arc::new(RwLock::new(HashMap::new()));
1457
1458 let speaker_id = SpeakerId::new("RINCON_123");
1459
1460 let registry = StateWatchRegistry {
1461 watched: Arc::clone(&watched),
1462 ip_to_speaker,
1463 key_to_service: Arc::clone(&key_to_service),
1464 };
1465
1466 watched.write().insert((speaker_id.clone(), "volume"));
1468 key_to_service
1469 .write()
1470 .insert("volume", Service::RenderingControl);
1471
1472 let unknown_ip: IpAddr = "10.0.0.1".parse().unwrap();
1474 registry.unregister_watches_for_service(unknown_ip, Service::RenderingControl);
1475
1476 assert_eq!(watched.read().len(), 1);
1478 }
1479
1480 #[test]
1481 fn test_state_watch_registry_only_removes_matching_speaker() {
1482 let watched = Arc::new(RwLock::new(HashSet::new()));
1483 let ip_to_speaker = Arc::new(RwLock::new(HashMap::new()));
1484 let key_to_service = Arc::new(RwLock::new(HashMap::new()));
1485
1486 let ip1: IpAddr = "192.168.1.100".parse().unwrap();
1487 let ip2: IpAddr = "192.168.1.101".parse().unwrap();
1488 let speaker1 = SpeakerId::new("RINCON_111");
1489 let speaker2 = SpeakerId::new("RINCON_222");
1490
1491 ip_to_speaker.write().insert(ip1, speaker1.clone());
1492 ip_to_speaker.write().insert(ip2, speaker2.clone());
1493
1494 let registry = StateWatchRegistry {
1495 watched: Arc::clone(&watched),
1496 ip_to_speaker,
1497 key_to_service: Arc::clone(&key_to_service),
1498 };
1499
1500 registry.register_watch(&speaker1, "volume", Service::RenderingControl);
1502 registry.register_watch(&speaker2, "volume", Service::RenderingControl);
1503 assert_eq!(watched.read().len(), 2);
1504
1505 registry.unregister_watches_for_service(ip1, Service::RenderingControl);
1507
1508 let w = watched.read();
1509 assert_eq!(w.len(), 1);
1510 assert!(w.contains(&(speaker2.clone(), "volume")));
1511 assert!(!w.contains(&(speaker1.clone(), "volume")));
1512 }
1513}