Skip to main content

sonos_state/
state.rs

1//! Sync-first State Management for Sonos devices
2//!
3//! Provides a synchronous API for managing Sonos device state with
4//! background event processing.
5//!
6//! # Example
7//!
8//! ```rust,ignore
9//! use sonos_state::{StateManager, Volume};
10//! use sonos_discovery;
11//!
12//! // Create state manager (sync)
13//! let manager = StateManager::new()?;
14//! let devices = sonos_discovery::get();
15//! manager.add_devices(devices)?;
16//!
17//! // Get speakers
18//! for info in manager.speaker_infos() {
19//!     println!("{}: {}", info.name, info.ip_address);
20//! }
21//!
22//! // Blocking iteration over changes
23//! for event in manager.iter() {
24//!     println!("Change: {:?}", event);
25//! }
26//! ```
27
28use std::any::{Any, TypeId};
29use std::collections::{HashMap, HashSet};
30use std::net::IpAddr;
31use std::sync::{mpsc, Arc, Mutex, OnceLock};
32use std::thread::JoinHandle;
33use std::time::{Duration, Instant};
34
35use parking_lot::RwLock;
36
37use sonos_api::Service;
38use sonos_discovery::Device;
39use sonos_event_manager::{SonosEventManager, WatchRegistry};
40use tracing::info;
41
42use crate::event_worker::spawn_state_event_worker;
43use crate::iter::ChangeIterator;
44use crate::model::{GroupId, SpeakerId, SpeakerInfo};
45use crate::property::{GroupInfo, Property, SonosProperty, Topology};
46use crate::{Result, StateError};
47
48// ============================================================================
49// ChangeEvent - for iter()
50// ============================================================================
51
52/// A change event emitted when a watched property changes
53#[derive(Debug, Clone)]
54pub struct ChangeEvent {
55    /// Speaker or entity that changed
56    pub speaker_id: SpeakerId,
57    /// Property key that changed
58    pub property_key: &'static str,
59    /// Service the property belongs to
60    pub service: Service,
61    /// When the change occurred
62    pub timestamp: Instant,
63}
64
65impl ChangeEvent {
66    pub fn new(speaker_id: SpeakerId, property_key: &'static str, service: Service) -> Self {
67        Self {
68            speaker_id,
69            property_key,
70            service,
71            timestamp: Instant::now(),
72        }
73    }
74}
75
76// ============================================================================
77// Internal StateStore
78// ============================================================================
79
80/// Internal state storage
81pub struct StateStore {
82    /// Speaker metadata
83    pub(crate) speakers: HashMap<SpeakerId, SpeakerInfo>,
84    /// IP to speaker ID mapping
85    pub(crate) ip_to_speaker: HashMap<IpAddr, SpeakerId>,
86    /// Property values: (speaker_id, property_key) -> type-erased value
87    pub(crate) speaker_props: HashMap<SpeakerId, PropertyBag>,
88    /// Group metadata
89    pub(crate) groups: HashMap<GroupId, GroupInfo>,
90    /// Group properties
91    pub(crate) group_props: HashMap<GroupId, PropertyBag>,
92    /// System properties
93    pub(crate) system_props: PropertyBag,
94    /// Speaker to group mapping for quick lookups
95    pub(crate) speaker_to_group: HashMap<SpeakerId, GroupId>,
96}
97
98impl StateStore {
99    pub(crate) fn new() -> Self {
100        Self {
101            speakers: HashMap::new(),
102            ip_to_speaker: HashMap::new(),
103            speaker_props: HashMap::new(),
104            groups: HashMap::new(),
105            group_props: HashMap::new(),
106            system_props: PropertyBag::new(),
107            speaker_to_group: HashMap::new(),
108        }
109    }
110
111    pub(crate) fn add_speaker(&mut self, speaker: SpeakerInfo) {
112        let id = speaker.id.clone();
113        let ip = speaker.ip_address;
114        self.ip_to_speaker.insert(ip, id.clone());
115        self.speakers.insert(id.clone(), speaker);
116        self.speaker_props
117            .entry(id)
118            .or_insert_with(PropertyBag::new);
119    }
120
121    fn speaker(&self, id: &SpeakerId) -> Option<&SpeakerInfo> {
122        self.speakers.get(id)
123    }
124
125    fn speakers(&self) -> Vec<SpeakerInfo> {
126        self.speakers.values().cloned().collect()
127    }
128
129    pub(crate) fn add_group(&mut self, group: GroupInfo) {
130        let id = group.id.clone();
131        // Update speaker_to_group mapping for all members
132        for member_id in &group.member_ids {
133            self.speaker_to_group.insert(member_id.clone(), id.clone());
134        }
135        self.groups.insert(id.clone(), group);
136        self.group_props.entry(id).or_insert_with(PropertyBag::new);
137    }
138
139    /// Get the group a speaker belongs to
140    #[allow(dead_code)]
141    pub(crate) fn get_group_for_speaker(&self, speaker_id: &SpeakerId) -> Option<&GroupInfo> {
142        let group_id = self.speaker_to_group.get(speaker_id)?;
143        self.groups.get(group_id)
144    }
145
146    /// Clear all groups and speaker_to_group mappings
147    ///
148    /// Used when processing topology updates to replace all group data
149    pub(crate) fn clear_groups(&mut self) {
150        self.groups.clear();
151        self.group_props.clear();
152        self.speaker_to_group.clear();
153    }
154
155    pub(crate) fn get<P: Property>(&self, speaker_id: &SpeakerId) -> Option<P> {
156        self.speaker_props.get(speaker_id)?.get::<P>()
157    }
158
159    pub(crate) fn set<P: Property>(&mut self, speaker_id: &SpeakerId, value: P) -> bool {
160        let bag = self
161            .speaker_props
162            .entry(speaker_id.clone())
163            .or_insert_with(PropertyBag::new);
164        bag.set(value)
165    }
166
167    pub(crate) fn get_group<P: Property>(&self, group_id: &GroupId) -> Option<P> {
168        self.group_props.get(group_id)?.get::<P>()
169    }
170
171    pub(crate) fn set_group<P: Property>(&mut self, group_id: &GroupId, value: P) -> bool {
172        let bag = self
173            .group_props
174            .entry(group_id.clone())
175            .or_insert_with(PropertyBag::new);
176        bag.set(value)
177    }
178
179    fn set_system<P: Property>(&mut self, value: P) -> bool {
180        self.system_props.set(value)
181    }
182
183    fn is_empty(&self) -> bool {
184        self.speakers.is_empty()
185    }
186
187    fn speaker_count(&self) -> usize {
188        self.speakers.len()
189    }
190
191    fn group_count(&self) -> usize {
192        self.groups.len()
193    }
194}
195
196// ============================================================================
197// PropertyBag - type-erased property storage
198// ============================================================================
199
200pub(crate) struct PropertyBag {
201    /// Map<TypeId, Box<dyn Any>> where Any is the property value
202    values: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
203}
204
205impl PropertyBag {
206    pub(crate) fn new() -> Self {
207        Self {
208            values: HashMap::new(),
209        }
210    }
211
212    fn get<P: Property>(&self) -> Option<P> {
213        let type_id = TypeId::of::<P>();
214        self.values
215            .get(&type_id)
216            .and_then(|boxed| boxed.downcast_ref::<P>())
217            .cloned()
218    }
219
220    fn set<P: Property>(&mut self, value: P) -> bool {
221        let type_id = TypeId::of::<P>();
222        let current = self
223            .values
224            .get(&type_id)
225            .and_then(|boxed| boxed.downcast_ref::<P>());
226
227        if current != Some(&value) {
228            self.values.insert(type_id, Box::new(value));
229            true
230        } else {
231            false
232        }
233    }
234}
235
236// ============================================================================
237// StateManager - main entry point
238// ============================================================================
239
240/// Core state manager with sync-first API
241///
242/// All public methods are synchronous. Background event processing
243/// happens in a dedicated thread.
244pub struct StateManager {
245    /// Property values storage
246    store: Arc<RwLock<StateStore>>,
247
248    /// Watched properties for iter() filtering
249    watched: Arc<RwLock<HashSet<(SpeakerId, &'static str)>>>,
250
251    /// IP to speaker ID mapping (for event worker)
252    ip_to_speaker: Arc<RwLock<HashMap<IpAddr, SpeakerId>>>,
253
254    /// Event manager (set-once via OnceLock — enables live events)
255    event_manager: OnceLock<Arc<SonosEventManager>>,
256
257    /// Channel for sending change events to iter()
258    event_tx: mpsc::Sender<ChangeEvent>,
259
260    /// Receiver for iter() - wrapped in `Arc<Mutex>` for cloning
261    event_rx: Arc<Mutex<mpsc::Receiver<ChangeEvent>>>,
262
263    /// Background event processor handle (lazily spawned)
264    _worker: Mutex<Option<JoinHandle<()>>>,
265
266    /// Cleanup timeout for subscriptions
267    cleanup_timeout: Duration,
268
269    /// Maps property key → Service for WatchRegistry's unregister_watches_for_service.
270    /// Shared with StateWatchRegistry via Arc.
271    key_to_service: Arc<RwLock<HashMap<&'static str, Service>>>,
272}
273
274// ============================================================================
275// StateWatchRegistry - WatchRegistry impl for SonosEventManager
276// ============================================================================
277
278/// Lightweight WatchRegistry implementation wired into the event manager.
279///
280/// Separated from StateManager because `mpsc::Sender` is `!Sync`,
281/// preventing StateManager itself from satisfying `WatchRegistry: Sync`.
282/// This struct holds only the Arc-wrapped fields needed for watch management.
283struct StateWatchRegistry {
284    watched: Arc<RwLock<HashSet<(SpeakerId, &'static str)>>>,
285    ip_to_speaker: Arc<RwLock<HashMap<IpAddr, SpeakerId>>>,
286    key_to_service: Arc<RwLock<HashMap<&'static str, Service>>>,
287}
288
289impl WatchRegistry for StateWatchRegistry {
290    fn register_watch(&self, speaker_id: &SpeakerId, key: &'static str, service: Service) {
291        self.watched.write().insert((speaker_id.clone(), key));
292        self.key_to_service.write().insert(key, service);
293    }
294
295    fn unregister_watches_for_service(&self, ip: IpAddr, service: Service) {
296        // 1. Resolve IP → SpeakerId
297        let speaker_id = match self.ip_to_speaker.read().get(&ip).cloned() {
298            Some(id) => id,
299            None => {
300                tracing::warn!(
301                    "unregister_watches_for_service: no speaker found for IP {}",
302                    ip
303                );
304                return;
305            }
306        };
307
308        // 2. Find property keys belonging to this service
309        let service_keys: Vec<&'static str> = self
310            .key_to_service
311            .read()
312            .iter()
313            .filter(|(_, &svc)| svc == service)
314            .map(|(&key, _)| key)
315            .collect();
316
317        // 3. Remove matching (speaker_id, key) entries from watched set
318        let mut watched = self.watched.write();
319        for key in service_keys {
320            watched.remove(&(speaker_id.clone(), key));
321        }
322    }
323}
324
325impl StateManager {
326    /// Create a new StateManager with default settings (sync)
327    ///
328    /// # Example
329    ///
330    /// ```rust,ignore
331    /// let manager = StateManager::new()?;
332    /// ```
333    pub fn new() -> Result<Self> {
334        Self::builder().build()
335    }
336
337    /// Create a StateManager builder for custom configuration
338    pub fn builder() -> StateManagerBuilder {
339        StateManagerBuilder::default()
340    }
341
342    /// Add discovered devices (sync)
343    ///
344    /// # Example
345    ///
346    /// ```rust,ignore
347    /// let devices = sonos_discovery::get();
348    /// manager.add_devices(devices)?;
349    /// ```
350    pub fn add_devices(&self, devices: Vec<Device>) -> Result<()> {
351        let mut store = self.store.write();
352        let mut ip_map = self.ip_to_speaker.write();
353
354        for device in devices {
355            let speaker_id = SpeakerId::new(&device.id);
356            let ip: IpAddr = device
357                .ip_address
358                .parse()
359                .map_err(|_| StateError::InvalidIpAddress(device.ip_address.clone()))?;
360
361            let friendly_name = if device.room_name.is_empty() || device.room_name == "Unknown" {
362                device.name.clone()
363            } else {
364                device.room_name.clone()
365            };
366
367            let info = SpeakerInfo {
368                id: speaker_id.clone(),
369                name: friendly_name,
370                room_name: device.room_name.clone(),
371                ip_address: ip,
372                port: device.port,
373                model_name: device.model_name.clone(),
374                software_version: "unknown".to_string(),
375                boot_seq: 0,
376                satellites: vec![],
377            };
378
379            // Update ip_to_speaker mapping
380            ip_map.insert(ip, speaker_id.clone());
381            tracing::debug!(
382                "Added speaker {} at IP {} to ip_to_speaker map",
383                speaker_id.as_str(),
384                ip
385            );
386
387            store.add_speaker(info);
388        }
389
390        // Also add devices to event manager if present
391        drop(store);
392        drop(ip_map);
393
394        if let Some(em) = self.event_manager.get() {
395            let devices_for_em: Vec<_> = self
396                .speaker_infos()
397                .iter()
398                .map(|info| sonos_discovery::Device {
399                    id: info.id.as_str().to_string(),
400                    name: info.name.clone(),
401                    room_name: info.room_name.clone(),
402                    ip_address: info.ip_address.to_string(),
403                    port: info.port,
404                    model_name: info.model_name.clone(),
405                })
406                .collect();
407
408            if let Err(e) = em.add_devices(devices_for_em) {
409                tracing::warn!("Failed to add devices to event manager: {}", e);
410            }
411        }
412
413        Ok(())
414    }
415
416    /// Get all speaker info
417    pub fn speaker_infos(&self) -> Vec<SpeakerInfo> {
418        self.store.read().speakers()
419    }
420
421    /// Get a specific speaker info by ID
422    pub fn speaker_info(&self, speaker_id: &SpeakerId) -> Option<SpeakerInfo> {
423        self.store.read().speaker(speaker_id).cloned()
424    }
425
426    /// Get speaker IP by ID
427    pub fn get_speaker_ip(&self, speaker_id: &SpeakerId) -> Option<IpAddr> {
428        self.store.read().speaker(speaker_id).map(|s| s.ip_address)
429    }
430
431    /// Get boot_seq for a speaker (used by GroupManagement AddMember)
432    pub fn get_boot_seq(&self, speaker_id: &SpeakerId) -> Option<u32> {
433        self.store.read().speaker(speaker_id).map(|s| s.boot_seq)
434    }
435
436    /// Create a blocking iterator over change events
437    ///
438    /// Only emits events for properties that have been watched.
439    ///
440    /// # Example
441    ///
442    /// ```rust,ignore
443    /// // First, watch some properties
444    /// speaker.volume.watch()?;
445    ///
446    /// // Then iterate over changes
447    /// for event in manager.iter() {
448    ///     println!("Changed: {} on {}", event.property_key, event.speaker_id);
449    /// }
450    /// ```
451    pub fn iter(&self) -> ChangeIterator {
452        ChangeIterator::new(Arc::clone(&self.event_rx))
453    }
454
455    /// Get current property value (sync, no subscription)
456    pub fn get_property<P: Property>(&self, speaker_id: &SpeakerId) -> Option<P> {
457        self.store.read().get::<P>(speaker_id)
458    }
459
460    /// Get current group property value (sync, no subscription)
461    pub fn get_group_property<P: Property>(&self, group_id: &GroupId) -> Option<P> {
462        self.store.read().get_group::<P>(group_id)
463    }
464
465    /// Set a property value
466    ///
467    /// Updates the property value in the store and emits a change event
468    /// if the property is being watched.
469    pub fn set_property<P: SonosProperty>(&self, speaker_id: &SpeakerId, value: P) {
470        let changed = {
471            let mut store = self.store.write();
472            store.set::<P>(speaker_id, value)
473        };
474
475        if changed {
476            self.maybe_emit_change(speaker_id, P::KEY, P::SERVICE);
477        }
478    }
479
480    /// Set a group property value
481    ///
482    /// Updates the group property value in the store and emits a change event
483    /// if the property is being watched (keyed on the coordinator's speaker ID).
484    /// Used by the SDK layer to store group-scoped values fetched via API calls.
485    pub fn set_group_property<P: SonosProperty>(&self, group_id: &GroupId, value: P) {
486        let coordinator_id = {
487            let mut store = self.store.write();
488            let changed = store.set_group::<P>(group_id, value);
489            if !changed {
490                return;
491            }
492            store.groups.get(group_id).map(|g| g.coordinator_id.clone())
493        };
494
495        if let Some(coordinator_id) = coordinator_id {
496            self.maybe_emit_change(&coordinator_id, P::KEY, P::SERVICE);
497        }
498    }
499
500    /// Register a property as watched (called by PropertyHandle::watch)
501    pub fn register_watch(&self, speaker_id: &SpeakerId, property_key: &'static str) {
502        self.watched
503            .write()
504            .insert((speaker_id.clone(), property_key));
505    }
506
507    /// Unregister a property watch
508    pub fn unregister_watch(&self, speaker_id: &SpeakerId, property_key: &'static str) {
509        self.watched
510            .write()
511            .remove(&(speaker_id.clone(), property_key));
512    }
513
514    /// Watch a property with automatic UPnP subscription (recommended API)
515    ///
516    /// This is the preferred method for watching properties as it:
517    /// 1. Registers the property for change notifications
518    /// 2. Subscribes to the UPnP service via the event manager
519    ///
520    /// Returns the current cached value if available.
521    pub fn watch_property_with_subscription<P: SonosProperty>(
522        &self,
523        speaker_id: &SpeakerId,
524    ) -> Result<Option<P>> {
525        // Register for change notifications
526        self.register_watch(speaker_id, P::KEY);
527
528        // Subscribe via event manager if available
529        if let Some(em) = self.event_manager.get() {
530            // Get speaker IP from store
531            if let Some(ip) = self.get_speaker_ip(speaker_id) {
532                if let Err(e) = em.ensure_service_subscribed(ip, P::SERVICE) {
533                    tracing::warn!(
534                        "Failed to subscribe to {:?} for {}: {}",
535                        P::SERVICE,
536                        speaker_id.as_str(),
537                        e
538                    );
539                }
540            }
541        }
542
543        Ok(self.get_property::<P>(speaker_id))
544    }
545
546    /// Unwatch a property and release UPnP subscription
547    pub fn unwatch_property_with_subscription<P: SonosProperty>(&self, speaker_id: &SpeakerId) {
548        // Unregister from change notifications
549        self.unregister_watch(speaker_id, P::KEY);
550
551        // Release subscription via event manager if available
552        if let Some(em) = self.event_manager.get() {
553            if let Some(ip) = self.get_speaker_ip(speaker_id) {
554                if let Err(e) = em.release_service_subscription(ip, P::SERVICE) {
555                    tracing::warn!(
556                        "Failed to unsubscribe from {:?} for {}: {}",
557                        P::SERVICE,
558                        speaker_id.as_str(),
559                        e
560                    );
561                }
562            }
563        }
564    }
565
566    /// Check if a property is being watched
567    pub fn is_watched(&self, speaker_id: &SpeakerId, property_key: &'static str) -> bool {
568        self.watched
569            .read()
570            .contains(&(speaker_id.clone(), property_key))
571    }
572
573    /// Emit a change event if the property is being watched
574    fn maybe_emit_change(
575        &self,
576        speaker_id: &SpeakerId,
577        property_key: &'static str,
578        service: Service,
579    ) {
580        let is_watched = self
581            .watched
582            .read()
583            .contains(&(speaker_id.clone(), property_key));
584
585        if is_watched {
586            let event = ChangeEvent::new(speaker_id.clone(), property_key, service);
587            let _ = self.event_tx.send(event);
588        }
589    }
590
591    /// Initialize from topology data
592    pub fn initialize(&self, topology: Topology) {
593        let mut store = self.store.write();
594        for speaker in &topology.speakers {
595            store.add_speaker(speaker.clone());
596        }
597        for group in &topology.groups {
598            store.add_group(group.clone());
599        }
600        store.set_system(topology);
601    }
602
603    /// Check if initialized with any speakers
604    pub fn is_initialized(&self) -> bool {
605        !self.store.read().is_empty()
606    }
607
608    /// Get number of speakers
609    pub fn speaker_count(&self) -> usize {
610        self.store.read().speaker_count()
611    }
612
613    /// Get number of groups
614    pub fn group_count(&self) -> usize {
615        self.store.read().group_count()
616    }
617
618    /// Get all current groups
619    ///
620    /// Returns all groups in the system. Every speaker is always in a group,
621    /// so a single speaker forms a group of one.
622    pub fn groups(&self) -> Vec<GroupInfo> {
623        self.store.read().groups.values().cloned().collect()
624    }
625
626    /// Get a specific group by ID
627    pub fn get_group(&self, group_id: &GroupId) -> Option<GroupInfo> {
628        self.store.read().groups.get(group_id).cloned()
629    }
630
631    /// Get the group a speaker belongs to
632    ///
633    /// Uses the speaker_to_group mapping for quick lookup.
634    pub fn get_group_for_speaker(&self, speaker_id: &SpeakerId) -> Option<GroupInfo> {
635        let store = self.store.read();
636        let group_id = store.speaker_to_group.get(speaker_id)?;
637        store.groups.get(group_id).cloned()
638    }
639
640    /// Get access to the event manager (if configured)
641    ///
642    /// This allows PropertyHandle::watch() to trigger UPnP subscriptions
643    /// via the event manager's ensure_service_subscribed() method.
644    pub fn event_manager(&self) -> Option<&Arc<SonosEventManager>> {
645        self.event_manager.get()
646    }
647
648    /// Wire an event manager into this StateManager after construction.
649    ///
650    /// Spawns the event worker thread and registers all known devices.
651    /// Can only be called once — subsequent calls are no-ops.
652    pub fn set_event_manager(&self, em: Arc<SonosEventManager>) -> Result<()> {
653        if self.event_manager.set(Arc::clone(&em)).is_err() {
654            return Ok(()); // Already set — no-op
655        }
656
657        // Wire this StateManager as the WatchRegistry
658        em.set_watch_registry(Arc::new(StateWatchRegistry {
659            watched: Arc::clone(&self.watched),
660            ip_to_speaker: Arc::clone(&self.ip_to_speaker),
661            key_to_service: Arc::clone(&self.key_to_service),
662        }));
663
664        // Register all known devices with the event manager
665        let devices_for_em: Vec<_> = self
666            .speaker_infos()
667            .iter()
668            .map(|info| sonos_discovery::Device {
669                id: info.id.as_str().to_string(),
670                name: info.name.clone(),
671                room_name: info.room_name.clone(),
672                ip_address: info.ip_address.to_string(),
673                port: info.port,
674                model_name: info.model_name.clone(),
675            })
676            .collect();
677
678        if let Err(e) = em.add_devices(devices_for_em) {
679            tracing::warn!(
680                "Failed to add devices to event manager during lazy init: {}",
681                e
682            );
683        }
684
685        // Spawn event worker thread
686        let worker = spawn_state_event_worker(
687            em,
688            Arc::clone(&self.store),
689            Arc::clone(&self.watched),
690            self.event_tx.clone(),
691            Arc::clone(&self.ip_to_speaker),
692        );
693        info!("StateManager event worker started (lazy init)");
694
695        if let Ok(mut w) = self._worker.lock() {
696            *w = Some(worker);
697        }
698
699        Ok(())
700    }
701}
702
703impl Clone for StateManager {
704    fn clone(&self) -> Self {
705        let event_manager = OnceLock::new();
706        if let Some(em) = self.event_manager.get() {
707            let _ = event_manager.set(Arc::clone(em));
708        }
709        Self {
710            store: Arc::clone(&self.store),
711            watched: Arc::clone(&self.watched),
712            ip_to_speaker: Arc::clone(&self.ip_to_speaker),
713            event_manager,
714            event_tx: self.event_tx.clone(),
715            event_rx: Arc::clone(&self.event_rx),
716            _worker: Mutex::new(None),
717            cleanup_timeout: self.cleanup_timeout,
718            key_to_service: Arc::clone(&self.key_to_service),
719        }
720    }
721}
722
723// ============================================================================
724// StateManagerBuilder
725// ============================================================================
726
727/// Builder for StateManager configuration
728pub struct StateManagerBuilder {
729    cleanup_timeout: Duration,
730    event_manager: Option<Arc<SonosEventManager>>,
731}
732
733impl Default for StateManagerBuilder {
734    fn default() -> Self {
735        Self {
736            cleanup_timeout: Duration::from_secs(5),
737            event_manager: None,
738        }
739    }
740}
741
742impl StateManagerBuilder {
743    /// Set the cleanup timeout for subscriptions
744    pub fn cleanup_timeout(mut self, timeout: Duration) -> Self {
745        self.cleanup_timeout = timeout;
746        self
747    }
748
749    /// Set the event manager for live event processing
750    ///
751    /// When an event manager is provided, the StateManager will:
752    /// - Spawn a background worker to process events
753    /// - Automatically subscribe/unsubscribe via `watch()`/`unwatch()` on properties
754    /// - Update state from incoming events
755    pub fn with_event_manager(mut self, em: Arc<SonosEventManager>) -> Self {
756        self.event_manager = Some(em);
757        self
758    }
759
760    /// Build the StateManager
761    pub fn build(self) -> Result<StateManager> {
762        let (event_tx, event_rx) = mpsc::channel();
763
764        let store = Arc::new(RwLock::new(StateStore::new()));
765        let watched = Arc::new(RwLock::new(HashSet::new()));
766        let ip_to_speaker = Arc::new(RwLock::new(HashMap::new()));
767        let key_to_service = Arc::new(RwLock::new(HashMap::new()));
768
769        let event_manager_lock = OnceLock::new();
770        let mut worker = None;
771
772        // If event_manager provided at build time, wire it up eagerly
773        if let Some(em) = self.event_manager {
774            let _ = event_manager_lock.set(Arc::clone(&em));
775
776            // Wire WatchRegistry
777            em.set_watch_registry(Arc::new(StateWatchRegistry {
778                watched: Arc::clone(&watched),
779                ip_to_speaker: Arc::clone(&ip_to_speaker),
780                key_to_service: Arc::clone(&key_to_service),
781            }));
782
783            let worker_handle = spawn_state_event_worker(
784                em,
785                Arc::clone(&store),
786                Arc::clone(&watched),
787                event_tx.clone(),
788                Arc::clone(&ip_to_speaker),
789            );
790            info!("StateManager event worker started");
791            worker = Some(worker_handle);
792        }
793
794        let manager = StateManager {
795            store,
796            watched,
797            ip_to_speaker,
798            event_manager: event_manager_lock,
799            event_tx,
800            event_rx: Arc::new(Mutex::new(event_rx)),
801            _worker: Mutex::new(worker),
802            cleanup_timeout: self.cleanup_timeout,
803            key_to_service,
804        };
805
806        info!("StateManager created (sync-first mode)");
807        Ok(manager)
808    }
809}
810
811#[cfg(test)]
812mod tests {
813    use super::*;
814    use crate::property::{GroupVolume, Volume};
815    use sonos_api::Service;
816
817    #[test]
818    fn test_state_manager_creation() {
819        let manager = StateManager::new().unwrap();
820        assert!(!manager.is_initialized());
821        assert_eq!(manager.speaker_count(), 0);
822    }
823
824    #[test]
825    fn test_add_devices() {
826        let manager = StateManager::new().unwrap();
827
828        let devices = vec![Device {
829            id: "RINCON_123".to_string(),
830            name: "Living Room".to_string(),
831            room_name: "Living Room".to_string(),
832            ip_address: "192.168.1.100".to_string(),
833            port: 1400,
834            model_name: "Sonos One".to_string(),
835        }];
836
837        manager.add_devices(devices).unwrap();
838        assert_eq!(manager.speaker_count(), 1);
839    }
840
841    #[test]
842    fn test_property_storage() {
843        let manager = StateManager::new().unwrap();
844
845        let devices = vec![Device {
846            id: "RINCON_123".to_string(),
847            name: "Living Room".to_string(),
848            room_name: "Living Room".to_string(),
849            ip_address: "192.168.1.100".to_string(),
850            port: 1400,
851            model_name: "Sonos One".to_string(),
852        }];
853        manager.add_devices(devices).unwrap();
854
855        let speaker_id = SpeakerId::new("RINCON_123");
856
857        // Initially None
858        assert!(manager.get_property::<Volume>(&speaker_id).is_none());
859
860        // Set value
861        manager.set_property(&speaker_id, Volume::new(50));
862        assert_eq!(
863            manager.get_property::<Volume>(&speaker_id),
864            Some(Volume::new(50))
865        );
866    }
867
868    #[test]
869    fn test_watch_registration() {
870        let manager = StateManager::new().unwrap();
871
872        let devices = vec![Device {
873            id: "RINCON_123".to_string(),
874            name: "Living Room".to_string(),
875            room_name: "Living Room".to_string(),
876            ip_address: "192.168.1.100".to_string(),
877            port: 1400,
878            model_name: "Sonos One".to_string(),
879        }];
880        manager.add_devices(devices).unwrap();
881
882        let speaker_id = SpeakerId::new("RINCON_123");
883
884        // Not watched initially
885        assert!(!manager.is_watched(&speaker_id, "volume"));
886
887        // Register watch
888        manager.register_watch(&speaker_id, "volume");
889        assert!(manager.is_watched(&speaker_id, "volume"));
890
891        // Unregister watch
892        manager.unregister_watch(&speaker_id, "volume");
893        assert!(!manager.is_watched(&speaker_id, "volume"));
894    }
895
896    #[test]
897    fn test_change_event_emission() {
898        let manager = StateManager::new().unwrap();
899
900        let devices = vec![Device {
901            id: "RINCON_123".to_string(),
902            name: "Living Room".to_string(),
903            room_name: "Living Room".to_string(),
904            ip_address: "192.168.1.100".to_string(),
905            port: 1400,
906            model_name: "Sonos One".to_string(),
907        }];
908        manager.add_devices(devices).unwrap();
909
910        let speaker_id = SpeakerId::new("RINCON_123");
911
912        // Register watch
913        manager.register_watch(&speaker_id, "volume");
914
915        // Set property (should emit event)
916        manager.set_property(&speaker_id, Volume::new(75));
917
918        // Get event via iter
919        let iter = manager.iter();
920        let event = iter.recv_timeout(std::time::Duration::from_millis(100));
921        assert!(event.is_some());
922
923        let event = event.unwrap();
924        assert_eq!(event.speaker_id.as_str(), "RINCON_123");
925        assert_eq!(event.property_key, "volume");
926    }
927
928    #[test]
929    fn test_set_group_property_emits_change_event() {
930        let manager = StateManager::new().unwrap();
931
932        let devices = vec![Device {
933            id: "RINCON_123".to_string(),
934            name: "Living Room".to_string(),
935            room_name: "Living Room".to_string(),
936            ip_address: "192.168.1.100".to_string(),
937            port: 1400,
938            model_name: "Sonos One".to_string(),
939        }];
940        manager.add_devices(devices).unwrap();
941
942        let speaker_id = SpeakerId::new("RINCON_123");
943        let group_id = GroupId::new("RINCON_123:1");
944
945        // Add group so coordinator lookup works
946        {
947            let mut store = manager.store.write();
948            store.add_group(GroupInfo::new(
949                group_id.clone(),
950                speaker_id.clone(),
951                vec![speaker_id.clone()],
952            ));
953        }
954
955        // Register watch on coordinator for group_volume
956        manager.register_watch(&speaker_id, "group_volume");
957
958        // Set group property (should emit event via coordinator)
959        manager.set_group_property(&group_id, GroupVolume::new(80));
960
961        // Verify event was emitted
962        let iter = manager.iter();
963        let event = iter.recv_timeout(std::time::Duration::from_millis(100));
964        assert!(event.is_some());
965
966        let event = event.unwrap();
967        assert_eq!(event.speaker_id.as_str(), "RINCON_123");
968        assert_eq!(event.property_key, "group_volume");
969        assert_eq!(event.service, Service::GroupRenderingControl);
970    }
971
972    #[test]
973    fn test_set_group_property_no_event_when_unwatched() {
974        let manager = StateManager::new().unwrap();
975
976        let devices = vec![Device {
977            id: "RINCON_123".to_string(),
978            name: "Living Room".to_string(),
979            room_name: "Living Room".to_string(),
980            ip_address: "192.168.1.100".to_string(),
981            port: 1400,
982            model_name: "Sonos One".to_string(),
983        }];
984        manager.add_devices(devices).unwrap();
985
986        let speaker_id = SpeakerId::new("RINCON_123");
987        let group_id = GroupId::new("RINCON_123:1");
988
989        {
990            let mut store = manager.store.write();
991            store.add_group(GroupInfo::new(
992                group_id.clone(),
993                speaker_id.clone(),
994                vec![speaker_id.clone()],
995            ));
996        }
997
998        // Don't register any watch
999        manager.set_group_property(&group_id, GroupVolume::new(50));
1000
1001        let iter = manager.iter();
1002        let event = iter.recv_timeout(std::time::Duration::from_millis(100));
1003        assert!(event.is_none());
1004    }
1005
1006    // ========================================================================
1007    // StateStore Group Operations Tests
1008    // ========================================================================
1009
1010    #[test]
1011    fn test_add_group_updates_speaker_to_group() {
1012        let mut store = StateStore::new();
1013
1014        let speaker1 = SpeakerId::new("RINCON_111");
1015        let speaker2 = SpeakerId::new("RINCON_222");
1016        let group_id = GroupId::new("RINCON_111:1");
1017
1018        let group = GroupInfo::new(
1019            group_id.clone(),
1020            speaker1.clone(),
1021            vec![speaker1.clone(), speaker2.clone()],
1022        );
1023
1024        store.add_group(group);
1025
1026        // Verify speaker_to_group mapping is updated for all members
1027        assert_eq!(store.speaker_to_group.get(&speaker1), Some(&group_id));
1028        assert_eq!(store.speaker_to_group.get(&speaker2), Some(&group_id));
1029    }
1030
1031    #[test]
1032    fn test_add_group_single_speaker() {
1033        let mut store = StateStore::new();
1034
1035        let speaker = SpeakerId::new("RINCON_333");
1036        let group_id = GroupId::new("RINCON_333:1");
1037
1038        let group = GroupInfo::new(group_id.clone(), speaker.clone(), vec![speaker.clone()]);
1039
1040        store.add_group(group.clone());
1041
1042        // Verify speaker_to_group mapping
1043        assert_eq!(store.speaker_to_group.get(&speaker), Some(&group_id));
1044
1045        // Verify group is stored
1046        assert_eq!(store.groups.get(&group_id), Some(&group));
1047    }
1048
1049    #[test]
1050    fn test_get_group_for_speaker_returns_correct_group() {
1051        let mut store = StateStore::new();
1052
1053        let speaker1 = SpeakerId::new("RINCON_111");
1054        let speaker2 = SpeakerId::new("RINCON_222");
1055        let speaker3 = SpeakerId::new("RINCON_333");
1056        let group1_id = GroupId::new("RINCON_111:1");
1057        let group2_id = GroupId::new("RINCON_333:1");
1058
1059        // Group 1: speaker1 (coordinator) + speaker2
1060        let group1 = GroupInfo::new(
1061            group1_id.clone(),
1062            speaker1.clone(),
1063            vec![speaker1.clone(), speaker2.clone()],
1064        );
1065
1066        // Group 2: speaker3 alone
1067        let group2 = GroupInfo::new(group2_id.clone(), speaker3.clone(), vec![speaker3.clone()]);
1068
1069        store.add_group(group1.clone());
1070        store.add_group(group2.clone());
1071
1072        // Verify get_group_for_speaker returns correct groups
1073        assert_eq!(store.get_group_for_speaker(&speaker1), Some(&group1));
1074        assert_eq!(store.get_group_for_speaker(&speaker2), Some(&group1));
1075        assert_eq!(store.get_group_for_speaker(&speaker3), Some(&group2));
1076    }
1077
1078    #[test]
1079    fn test_get_group_for_speaker_returns_none_for_unknown() {
1080        let store = StateStore::new();
1081
1082        let unknown_speaker = SpeakerId::new("RINCON_UNKNOWN");
1083
1084        assert!(store.get_group_for_speaker(&unknown_speaker).is_none());
1085    }
1086
1087    #[test]
1088    fn test_clear_groups_removes_all_group_data() {
1089        let mut store = StateStore::new();
1090
1091        let speaker1 = SpeakerId::new("RINCON_111");
1092        let speaker2 = SpeakerId::new("RINCON_222");
1093        let group_id = GroupId::new("RINCON_111:1");
1094
1095        let group = GroupInfo::new(
1096            group_id.clone(),
1097            speaker1.clone(),
1098            vec![speaker1.clone(), speaker2.clone()],
1099        );
1100
1101        store.add_group(group);
1102
1103        // Verify data exists
1104        assert!(!store.groups.is_empty());
1105        assert!(!store.speaker_to_group.is_empty());
1106
1107        // Clear groups
1108        store.clear_groups();
1109
1110        // Verify all group data is cleared
1111        assert!(store.groups.is_empty());
1112        assert!(store.group_props.is_empty());
1113        assert!(store.speaker_to_group.is_empty());
1114    }
1115
1116    #[test]
1117    fn test_clear_groups_then_add_new_groups() {
1118        let mut store = StateStore::new();
1119
1120        // Add initial group
1121        let speaker1 = SpeakerId::new("RINCON_111");
1122        let group1_id = GroupId::new("RINCON_111:1");
1123        let group1 = GroupInfo::new(group1_id.clone(), speaker1.clone(), vec![speaker1.clone()]);
1124        store.add_group(group1);
1125
1126        // Clear and add new group
1127        store.clear_groups();
1128
1129        let speaker2 = SpeakerId::new("RINCON_222");
1130        let group2_id = GroupId::new("RINCON_222:1");
1131        let group2 = GroupInfo::new(group2_id.clone(), speaker2.clone(), vec![speaker2.clone()]);
1132        store.add_group(group2.clone());
1133
1134        // Verify old group is gone, new group exists
1135        assert!(!store.groups.contains_key(&group1_id));
1136        assert_eq!(store.groups.get(&group2_id), Some(&group2));
1137
1138        // Verify speaker_to_group is updated correctly
1139        assert!(!store.speaker_to_group.contains_key(&speaker1));
1140        assert_eq!(store.speaker_to_group.get(&speaker2), Some(&group2_id));
1141    }
1142
1143    // ========================================================================
1144    // StateManager Group Methods Tests
1145    // ========================================================================
1146
1147    #[test]
1148    fn test_state_manager_groups_returns_all_groups() {
1149        let manager = StateManager::new().unwrap();
1150
1151        // Add devices
1152        let devices = vec![
1153            Device {
1154                id: "RINCON_111".to_string(),
1155                name: "Living Room".to_string(),
1156                room_name: "Living Room".to_string(),
1157                ip_address: "192.168.1.100".to_string(),
1158                port: 1400,
1159                model_name: "Sonos One".to_string(),
1160            },
1161            Device {
1162                id: "RINCON_222".to_string(),
1163                name: "Kitchen".to_string(),
1164                room_name: "Kitchen".to_string(),
1165                ip_address: "192.168.1.101".to_string(),
1166                port: 1400,
1167                model_name: "Sonos One".to_string(),
1168            },
1169        ];
1170        manager.add_devices(devices).unwrap();
1171
1172        // Create groups via initialize
1173        let speaker1 = SpeakerId::new("RINCON_111");
1174        let speaker2 = SpeakerId::new("RINCON_222");
1175        let group1 = GroupInfo::new(
1176            GroupId::new("RINCON_111:1"),
1177            speaker1.clone(),
1178            vec![speaker1.clone()],
1179        );
1180        let group2 = GroupInfo::new(
1181            GroupId::new("RINCON_222:1"),
1182            speaker2.clone(),
1183            vec![speaker2.clone()],
1184        );
1185
1186        let topology = Topology::new(
1187            manager.speaker_infos(),
1188            vec![group1.clone(), group2.clone()],
1189        );
1190        manager.initialize(topology);
1191
1192        // Verify groups() returns all groups
1193        let groups = manager.groups();
1194        assert_eq!(groups.len(), 2);
1195
1196        // Verify both groups are present (order may vary)
1197        let group_ids: Vec<_> = groups.iter().map(|g| g.id.clone()).collect();
1198        assert!(group_ids.contains(&GroupId::new("RINCON_111:1")));
1199        assert!(group_ids.contains(&GroupId::new("RINCON_222:1")));
1200    }
1201
1202    #[test]
1203    fn test_state_manager_groups_returns_empty_when_no_groups() {
1204        let manager = StateManager::new().unwrap();
1205
1206        // No groups added
1207        let groups = manager.groups();
1208        assert!(groups.is_empty());
1209    }
1210
1211    #[test]
1212    fn test_state_manager_get_group_returns_correct_group() {
1213        let manager = StateManager::new().unwrap();
1214
1215        // Add device
1216        let devices = vec![Device {
1217            id: "RINCON_111".to_string(),
1218            name: "Living Room".to_string(),
1219            room_name: "Living Room".to_string(),
1220            ip_address: "192.168.1.100".to_string(),
1221            port: 1400,
1222            model_name: "Sonos One".to_string(),
1223        }];
1224        manager.add_devices(devices).unwrap();
1225
1226        // Create group via initialize
1227        let speaker = SpeakerId::new("RINCON_111");
1228        let group_id = GroupId::new("RINCON_111:1");
1229        let group = GroupInfo::new(group_id.clone(), speaker.clone(), vec![speaker.clone()]);
1230
1231        let topology = Topology::new(manager.speaker_infos(), vec![group.clone()]);
1232        manager.initialize(topology);
1233
1234        // Verify get_group returns the correct group
1235        let found = manager.get_group(&group_id);
1236        assert!(found.is_some());
1237        assert_eq!(found.unwrap(), group);
1238    }
1239
1240    #[test]
1241    fn test_state_manager_get_group_returns_none_for_unknown() {
1242        let manager = StateManager::new().unwrap();
1243
1244        // No groups added
1245        let unknown_id = GroupId::new("RINCON_UNKNOWN:1");
1246        let found = manager.get_group(&unknown_id);
1247        assert!(found.is_none());
1248    }
1249
1250    #[test]
1251    fn test_state_manager_get_group_for_speaker_returns_correct_group() {
1252        let manager = StateManager::new().unwrap();
1253
1254        // Add devices
1255        let devices = vec![
1256            Device {
1257                id: "RINCON_111".to_string(),
1258                name: "Living Room".to_string(),
1259                room_name: "Living Room".to_string(),
1260                ip_address: "192.168.1.100".to_string(),
1261                port: 1400,
1262                model_name: "Sonos One".to_string(),
1263            },
1264            Device {
1265                id: "RINCON_222".to_string(),
1266                name: "Kitchen".to_string(),
1267                room_name: "Kitchen".to_string(),
1268                ip_address: "192.168.1.101".to_string(),
1269                port: 1400,
1270                model_name: "Sonos One".to_string(),
1271            },
1272        ];
1273        manager.add_devices(devices).unwrap();
1274
1275        // Create a group with both speakers
1276        let speaker1 = SpeakerId::new("RINCON_111");
1277        let speaker2 = SpeakerId::new("RINCON_222");
1278        let group_id = GroupId::new("RINCON_111:1");
1279        let group = GroupInfo::new(
1280            group_id.clone(),
1281            speaker1.clone(),
1282            vec![speaker1.clone(), speaker2.clone()],
1283        );
1284
1285        let topology = Topology::new(manager.speaker_infos(), vec![group.clone()]);
1286        manager.initialize(topology);
1287
1288        // Verify get_group_for_speaker returns the correct group for both speakers
1289        let found1 = manager.get_group_for_speaker(&speaker1);
1290        assert!(found1.is_some());
1291        assert_eq!(found1.unwrap(), group);
1292
1293        let found2 = manager.get_group_for_speaker(&speaker2);
1294        assert!(found2.is_some());
1295        assert_eq!(found2.unwrap(), group);
1296    }
1297
1298    #[test]
1299    fn test_state_manager_get_group_for_speaker_returns_none_for_unknown() {
1300        let manager = StateManager::new().unwrap();
1301
1302        // No groups added
1303        let unknown_speaker = SpeakerId::new("RINCON_UNKNOWN");
1304        let found = manager.get_group_for_speaker(&unknown_speaker);
1305        assert!(found.is_none());
1306    }
1307
1308    #[test]
1309    fn test_state_manager_group_methods_consistency() {
1310        let manager = StateManager::new().unwrap();
1311
1312        // Add device
1313        let devices = vec![Device {
1314            id: "RINCON_111".to_string(),
1315            name: "Living Room".to_string(),
1316            room_name: "Living Room".to_string(),
1317            ip_address: "192.168.1.100".to_string(),
1318            port: 1400,
1319            model_name: "Sonos One".to_string(),
1320        }];
1321        manager.add_devices(devices).unwrap();
1322
1323        // Create group via initialize
1324        let speaker = SpeakerId::new("RINCON_111");
1325        let group_id = GroupId::new("RINCON_111:1");
1326        let group = GroupInfo::new(group_id.clone(), speaker.clone(), vec![speaker.clone()]);
1327
1328        let topology = Topology::new(manager.speaker_infos(), vec![group.clone()]);
1329        manager.initialize(topology);
1330
1331        // Verify all three methods return consistent data
1332        let groups = manager.groups();
1333        assert_eq!(groups.len(), 1);
1334        assert_eq!(groups[0], group);
1335
1336        let by_id = manager.get_group(&group_id);
1337        assert_eq!(by_id, Some(group.clone()));
1338
1339        let by_speaker = manager.get_group_for_speaker(&speaker);
1340        assert_eq!(by_speaker, Some(group.clone()));
1341
1342        // All should return the same group
1343        assert_eq!(groups[0], by_id.unwrap());
1344        assert_eq!(groups[0], by_speaker.unwrap());
1345    }
1346
1347    // ========================================================================
1348    // boot_seq Tests
1349    // ========================================================================
1350
1351    #[test]
1352    fn test_get_boot_seq_returns_none_for_unknown_speaker() {
1353        let manager = StateManager::new().unwrap();
1354        let unknown = SpeakerId::new("RINCON_UNKNOWN");
1355        assert!(manager.get_boot_seq(&unknown).is_none());
1356    }
1357
1358    #[test]
1359    fn test_boot_seq_defaults_to_zero_for_new_speaker() {
1360        let manager = StateManager::new().unwrap();
1361
1362        let devices = vec![Device {
1363            id: "RINCON_123".to_string(),
1364            name: "Living Room".to_string(),
1365            room_name: "Living Room".to_string(),
1366            ip_address: "192.168.1.100".to_string(),
1367            port: 1400,
1368            model_name: "Sonos One".to_string(),
1369        }];
1370        manager.add_devices(devices).unwrap();
1371
1372        let speaker_id = SpeakerId::new("RINCON_123");
1373
1374        // Before any topology event, boot_seq should be 0
1375        assert_eq!(manager.get_boot_seq(&speaker_id), Some(0));
1376    }
1377
1378    // ========================================================================
1379    // StateWatchRegistry Tests
1380    // ========================================================================
1381
1382    #[test]
1383    fn test_state_watch_registry_register_and_unregister() {
1384        let watched = Arc::new(RwLock::new(HashSet::new()));
1385        let ip_to_speaker = Arc::new(RwLock::new(HashMap::new()));
1386        let key_to_service = Arc::new(RwLock::new(HashMap::new()));
1387
1388        let ip: IpAddr = "192.168.1.100".parse().unwrap();
1389        let speaker_id = SpeakerId::new("RINCON_123");
1390        ip_to_speaker.write().insert(ip, speaker_id.clone());
1391
1392        let registry = StateWatchRegistry {
1393            watched: Arc::clone(&watched),
1394            ip_to_speaker: Arc::clone(&ip_to_speaker),
1395            key_to_service: Arc::clone(&key_to_service),
1396        };
1397
1398        // Register watches on two services
1399        registry.register_watch(&speaker_id, "volume", Service::RenderingControl);
1400        registry.register_watch(&speaker_id, "mute", Service::RenderingControl);
1401        registry.register_watch(&speaker_id, "playback_state", Service::AVTransport);
1402
1403        assert_eq!(watched.read().len(), 3);
1404
1405        // Unregister RenderingControl — should remove volume + mute, keep playback_state
1406        registry.unregister_watches_for_service(ip, Service::RenderingControl);
1407
1408        let w = watched.read();
1409        assert_eq!(w.len(), 1);
1410        assert!(w.contains(&(speaker_id.clone(), "playback_state")));
1411        assert!(!w.contains(&(speaker_id.clone(), "volume")));
1412        assert!(!w.contains(&(speaker_id.clone(), "mute")));
1413    }
1414
1415    #[test]
1416    fn test_state_watch_registry_unknown_ip_is_noop() {
1417        let watched = Arc::new(RwLock::new(HashSet::new()));
1418        let ip_to_speaker = Arc::new(RwLock::new(HashMap::new()));
1419        let key_to_service = Arc::new(RwLock::new(HashMap::new()));
1420
1421        let speaker_id = SpeakerId::new("RINCON_123");
1422
1423        let registry = StateWatchRegistry {
1424            watched: Arc::clone(&watched),
1425            ip_to_speaker,
1426            key_to_service: Arc::clone(&key_to_service),
1427        };
1428
1429        // Register a watch (simulating direct add to shared set)
1430        watched.write().insert((speaker_id.clone(), "volume"));
1431        key_to_service
1432            .write()
1433            .insert("volume", Service::RenderingControl);
1434
1435        // Unregister for an unknown IP — should be a no-op
1436        let unknown_ip: IpAddr = "10.0.0.1".parse().unwrap();
1437        registry.unregister_watches_for_service(unknown_ip, Service::RenderingControl);
1438
1439        // Watch should still be there
1440        assert_eq!(watched.read().len(), 1);
1441    }
1442
1443    #[test]
1444    fn test_state_watch_registry_only_removes_matching_speaker() {
1445        let watched = Arc::new(RwLock::new(HashSet::new()));
1446        let ip_to_speaker = Arc::new(RwLock::new(HashMap::new()));
1447        let key_to_service = Arc::new(RwLock::new(HashMap::new()));
1448
1449        let ip1: IpAddr = "192.168.1.100".parse().unwrap();
1450        let ip2: IpAddr = "192.168.1.101".parse().unwrap();
1451        let speaker1 = SpeakerId::new("RINCON_111");
1452        let speaker2 = SpeakerId::new("RINCON_222");
1453
1454        ip_to_speaker.write().insert(ip1, speaker1.clone());
1455        ip_to_speaker.write().insert(ip2, speaker2.clone());
1456
1457        let registry = StateWatchRegistry {
1458            watched: Arc::clone(&watched),
1459            ip_to_speaker,
1460            key_to_service: Arc::clone(&key_to_service),
1461        };
1462
1463        // Both speakers watch volume
1464        registry.register_watch(&speaker1, "volume", Service::RenderingControl);
1465        registry.register_watch(&speaker2, "volume", Service::RenderingControl);
1466        assert_eq!(watched.read().len(), 2);
1467
1468        // Unregister only speaker1's IP
1469        registry.unregister_watches_for_service(ip1, Service::RenderingControl);
1470
1471        let w = watched.read();
1472        assert_eq!(w.len(), 1);
1473        assert!(w.contains(&(speaker2.clone(), "volume")));
1474        assert!(!w.contains(&(speaker1.clone(), "volume")));
1475    }
1476}